#!/usr/bin/env python import sys import os # at the moment this allows us to group the subdirectories as modules easily # TODO: a more general way would be to have this cristallina as a installed package os.chdir("/sf/cristallina/applications/slic/cristallina") # setup logging from loguru import logger def setup_logging(): logger.remove() logger.add( sys.stderr, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="INFO", ) logger.info("Loading started.") # create file handler which logs # TODO: better directory for general log files? try: logger.add( "/sf/cristallina/applications/slic/cristallina/log/cristallina.log", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="DEBUG", rotation="1 week", ) except PermissionError as e: logger.warning("Cannot write log file.") logger.warning(e) def setup_logging_pgroup(pgroup, level="INFO"): try: logger.add( f"/sf/cristallina/data/{pgroup}/scratch/slic.log", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level=level, rotation="1 week", ) except PermissionError as e: logger.warning(f"Cannot write log file to pgroup {pgroup}.") # We setup the logging before going further so that # other modules can write startup messages into the log file. setup_logging() # TODO: this is later overwritten, probably not intended that way? # from epics import PV from devices.alignment_laser import AlignmentLaser from slic.gui import GUI from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable from slic.core.acquisition import SFAcquisition, PVAcquisition from slic.core.condition import PVCondition from slic.core.scanner import Scanner from slic.devices.xdiagnostics.intensitymonitor import IntensityMonitorPBPS from slic.devices.general.motor import Motor from slic.devices.xoptics.pulsepicker import PulsePicker from slic.utils import devices, Marker, as_shortcut from slic.utils import Channels, Config, Elog, Screenshot, PV from slic.core.acquisition.fakeacquisition import FakeAcquisition from slic.devices.timing.events import CTASequencer from channels.bs_channels import ( detectors, detectors, detectors_MX, bs_channels, camera_channels, ) from channels.pv_channels import pvs from spreadsheet import overview # from channels_minimal import detectors_min, channels_min, pvs_min from devices.pp_shutter import PP_Shutter ################# DEVICES ################# dummy = DummyAdjustable(units="au") # Attenuators from slic.devices.xoptics.aramis_attenuator import Attenuator from devices.knife_edge import KnifeEdge from devices.standa import standa from devices.newport import newport Newport_large = Motor("SARES30-MOBI1:MOT_5") OWIS = Motor("SARES30-MOBI1:MOT_6") # small OWIS linear stage attenuator = Attenuator("SAROP31-OATA150", description="Cristallina attenuator OATA150") upstream_attenuator = Attenuator("SARFE10-OATT053", description="Aramis attenuator OATT053") def test_attenuator(): tfundamental = attenuator.get_transmission() try: assert tfundamental > 0 except TypeError: logger.warning("No transmission value reported from {attenuator.ID}") # test_attenuator() front_end_attenuator = Attenuator( "SARFE10-OATT053", description="Front end attenuator OATT053" ) cta = CTASequencer("SAR-CCTA-ESC") pbps113 = IntensityMonitorPBPS( "SAROP31-PBPS113", # vme_crate="SAROP31-CVME-PBPS1", # please check this! # link=9, description="Intensity/position monitor in the optics hutch", ) pbps149 = IntensityMonitorPBPS( "SAROP31-PBPS149", # vme_crate="SAROP31-CVME-PBPS2", # please check this! # link=9, description="Intensity/position monitor in the experimental hutch", ) # Undulators from beamline import undulator undulators = undulator.Undulators() logger.info( f"Using undulator (Aramis) offset to PSSS energy of {undulator.energy_offset} eV." ) # Shutter shutter = PP_Shutter( "SARES30-LTIM01-EVR0:RearUniv0-Ena-SP", name="Cristallina pulse picker shutter" ) # Shutter button when using the pulse picker pulsepicker = PulsePicker( "SAROP31-OPPI151", "SARES30-LTIM01-EVR0:Pul3", name="Cristallina X-ray pulse picker OPPI151", ) # Alignment laser alignment_laser = AlignmentLaser( "SAROP31-OLAS147:MOTOR_1", name="Cristallina alignment laser OLAS147" ) ## Slits from slic.devices.xoptics import slits ## Smaract & attocube stages from devices.smaract_device_def import smaract_Juraj, smaract_mini_XYZ from devices.attocube_device_def import attocube from devices.jj_device_def import jjslits # KBs from slic.devices.xoptics.kb import KBHor, KBVer kbHor = KBHor("SAROP31-OKBH154", description="Cristallina horizontal KB mirror") kbVer = KBVer("SAROP31-OKBV153", description="Cristallina vertical KB mirror") # Bernina monochromator from beamline.bernina_mono import BerninaMono BerninaDCM = BerninaMono("SAROP21-ODCM098") # Diffractometer from devices.diffractometer import Diffractometer diffractometer = Diffractometer("diffractometer") from devices.dilsc import Dilution dilution = Dilution() ################# Stand setup ################## # TODO: requires the stand client, need small howto how to start and configure or let it run all the time from slic.core.acquisition.spreadsheet import Spreadsheet # setup spreadsheet for transmission to stand spreadsheet = Spreadsheet( { "Transmission" : attenuator.trans1st, "Upstream Transmission": upstream_attenuator.trans1st, "Energy_setpoint" : undulators, "Energy_offset": undulator.energy_offset, "TD": diffractometer.td, "TRX": diffractometer.tr_x, "TRY": diffractometer.tr_y, "TRXBASE": diffractometer.trx_base, "TRYBASE": diffractometer.try_base, "THETA": diffractometer.theta, "TWOTHETA": diffractometer.twotheta, "Magnet_X": dilution.x, "Magnet_Y": dilution.y, "Magnet_Z": dilution.z, "DilSc_T_plato": dilution.T_plato, "DilSc_T_pucksensor": dilution.T_pucksensor, }, placeholders=("comment", "sample", "run_usable"), host="saresc-vcons-02.psi.ch", port=9090, ) try: from stand_client import Client stand_host = "saresc-vcons-02.psi.ch" stand_client = Client(host=stand_host, port=9090) response = stand_client.get() logger.info("Connected to stand server") except Exception as error: # catching with a broad net because different connection errors can occur. logger.warning(f"Cannot connect to stand server on {stand_host}. Disabling spreadsheet.") spreadsheet = None # spreadsheet = Spreadsheet(adjs, placeholders=PLACEHOLDERS, host="127.0.0.1", port=8080)) ################# DAQ Setup ################# instrument = "cristallina" # pgroup = "p19739" # commissioning March 2022 -- July 2022 # pgroup = "p20443" # commissioning Wavefront Sensor August 2022 (active) # pgroup = "p20558" # SwissMX commissioning 3 # pgroup = "p20557" # CrQ PMS commisioning 1 # pgroup = "p20509" # CrQ commisioning DilSc1 # pgroup = "p20519" # beamline commissioning 2 # pgroup = "p20841" # CrQ PMS commisioning 2 (Jan 2023) # pgroup = "p20993" # CrQ commisioning DilSc2 (March 2023) # pgroup = "p21147" # SAXS # pgroup = "p21238" # Cristallina photon diagnostics p-group with Chris # pgroup = "p21224" # SwissMX commisioning 7 # pgroup = "p19150" # Scratch # pgroup = "p19152" # Scratch # pgroup = "p20840" # Cr beamline commisioning (Jan-Feb 2023) # pgroup = "p21261" # CrQ PMS-3 July 2023 # pgroup = "p21528" # Cr-MX Colletier 2023-09-05 # pgroup = "p21516" # Beamline commissioning September 26-27, November 7 2023 # pgroup = "p21563" # Dil-Sc / diffractometer / tilted bunch / LiErF4 (/ TmVO4) # pgroup = "p21569" # Dil-Sc / diffractometer / tilted bunch / LiErF4 (/ TmVO4), November 17- # pgroup = "p21592" # HVE commissioning # pgroup = "p21640" # Dil-Sc / diffractometer / LiErF4 : 4 March 2024 # pgroup = "p21920" # Beamline commissioning 2024 pgroup = "p21741" # CrQ - DilSc - SAXS LiHoF4 # setup pgroup specific logger setup_logging_pgroup(pgroup) daq = SFAcquisition( instrument, pgroup, default_channels=bs_channels, default_pvs=pvs, default_detectors=detectors, rate_multiplicator=1, spreadsheet=spreadsheet, ) # There is a new EPICS buffer, so the archiver is no longer used. This makes sure we are taking PVs from the right place. daq.update_config_pvs() from acquisition import multiple_daqs DAQS = multiple_daqs.generate_DAQS(instrument, pgroup,bs_channels, pvs, detectors) # daq = FakeAcquisition(instrument, pgroup) # daqPV = PVAcquisition(instrument, pgroup, default_channels=channels_ks) # workaround for KS not going to DB # Run the scan only when gas monitor value larger than 10uJ (and smaller than 2000uJ): # required fraction defines ammount of data recorded to save the step and move on to the next one check_intensity_gas_monitor = PVCondition( "SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US", vmin=4, vmax=2000, wait_time=0.5, required_fraction=0.8, ) scan = Scanner(default_acquisitions=[daq], condition=check_intensity_gas_monitor) gui = GUI(scan, show_goto=True, show_spec=True) logger.info(f"Running at {instrument} with pgroup {pgroup}.") logger.info("Loading finished.")