Files
cristallina/cristallina.py
2023-05-09 18:22:18 +02:00

174 lines
5.4 KiB
Python

#!/usr/bin/env python
import sys
import os
os.chdir('/sf/cristallina/applications/slic/cristallina')
import re
# setup logging
import logging
logger = logging.getLogger("slic")
logger.setLevel(logging.INFO)
logger.info("Loading started.")
# create file handler which logs
try:
fh = logging.FileHandler('/sf/cristallina/applications/slic/cristallina/log/cristallina.log')
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
except PermissionError as e:
logger.warning("Cannot write log file.")
logger.warning(e)
from time import sleep
from tracemalloc import start
from epics import PV
from stand_client import Client
stand_client = Client(host='saresc-vcons-02.psi.ch', port=9090)
from alignment_laser import AlignmentLaser
from slic.gui import GUI
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
from slic.core.acquisition import SFAcquisition, PVAcquisition
from slic.core.condition import PVCondition
from slic.core.scanner import Scanner
from slic.devices.xdiagnostics.intensitymonitor import IntensityMonitorPBPS
from slic.devices.general.motor import Motor
from slic.devices.xoptics.pulsepicker import PulsePicker
from slic.utils import devices, Marker, as_shortcut
from slic.utils import Channels, Config, Elog, Screenshot, PV
from slic.core.acquisition.fakeacquisition import FakeAcquisition
from slic.devices.timing.events import CTASequencer
from bs_channels import detectors, bs_channels, camera_channels
from pv_channels import pvs
from spreadsheet import overview
from channels_minimal import detectors_min, channels_min, pvs_min
from pp_shutter import PP_Shutter
# DEVICES
dummy = DummyAdjustable(units="au")
# Attenuators
from slic.devices.xoptics.aramis_attenuator import Attenuator
from knife_edge import KnifeEdge
from standa import standa
standa = standa
Newport_large = Motor("SARES30-MOBI1:MOT_5")
attenuator = Attenuator("SAROP31-OATA150", description="Cristallina attenuator OATA150")
front_end_attenuator = Attenuator("SARFE10-OATT053", description="Front end attenuator OATT053")
cta = CTASequencer("SAR-CCTA-ESC")
pbps113 = IntensityMonitorPBPS(
"SAROP31-PBPS113",
# vme_crate="SAROP31-CVME-PBPS1", # please check this!
# link=9,
description="Intensity/position monitor in the optics hutch",
)
pbps149 = IntensityMonitorPBPS(
"SAROP31-PBPS149",
# vme_crate="SAROP31-CVME-PBPS2", # please check this!
# link=9,
description="Intensity/position monitor in the experimental hutch"
)
def test_attenuator():
tfundamental = attenuator.get_transmission()
try:
assert tfundamental > 0
except TypeError:
logger.warning("No transmission value reported from {attenuator.ID}")
test_attenuator()
# Undulators
import undulator
undulators = undulator.Undulators()
# Shutter
shutter = PP_Shutter("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP", name="Cristallina pulse picker shutter") # Shutter buttton when using the pulse picker
pulsepicker = PulsePicker("SAROP31-OPPI151","SARES30-LTIM01-EVR0:Pul3", name="Cristallina X-ray pulse picker OPPI151")
# Alignment laser
alaser = AlignmentLaser("SAROP31-OLAS147:MOTOR_1", name="Cristallina alignment laser OLAS147")
## Slits
from slic.devices.xoptics import slits
## Smaract & attocube stages
from smaract_device_def import smaract_Juraj, smaract_mini_XYZ
from attocube_device_def import attocube
from jj_device_def import jjslits
# KBs
from slic.devices.xoptics.kb import KBHor,KBVer
kbHor = KBHor(
"SAROP31-OKBH154",
description="Cristallina horizontal KB mirror"
)
kbVer = KBVer(
"SAROP31-OKBV153",
description="Cristallina vertical KB mirror"
)
###########################################
instrument = "cristallina"
# pgroup = "p19739" # commissioning March 2022 -- July 2022
# pgroup = "p20443" # commissioning Wavefront Sensor August 2022 (active)
# pgroup = "p20558" # SwissMX commissioning 3
# pgroup = "p20557" # CrQ PMS commisioning 1
# pgroup = "p20509" # CrQ commissoing DilSc1
# pgroup = "p20519" # beamline commissioning 2
# pgroup = "p20840" # Cr beamline commisioning (Jan-Feb 2023)
# pgroup = "p20841" # CrQ PMS commisioning 2 (Jan 2023)
pgroup = "p20993" # CrQ commissoing DilSc2 (March 2023)
# pgroup = "p19150" # Scratch
# pgroup = "p19152" # Scratch
daq = SFAcquisition(
instrument,
pgroup,
default_channels=bs_channels ,
default_pvs=pvs,
default_detectors=detectors,
rate_multiplicator=1,
)
# daq = FakeAcquisition(instrument, pgroup)
# daqPV = PVAcquisition(instrument, pgroup, default_channels=channels_ks) # workaround for KS not going to DB
# scan = Scanner(scan_info_dir=f"/sf/{instrument}/data/{pgroup}/res/scan_info", default_acquisitions=[daq], condition=None)
# Run the scan only when gas monitor value larger than 10uJ (and smaller than 2000uJ):
# required fraction defines ammount of data recorded to save the step and move on to the next one
check_intensity_gas_monitor = PVCondition("SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US", vmin=10, vmax=2000, wait_time=0.5, required_fraction=0.8)
scan = Scanner(default_acquisitions=[daq], condition = check_intensity_gas_monitor)
gui = GUI(scan, show_goto=True, show_spec=True)
logger.info(f"Running at {instrument} with pgroup {pgroup}.")
logger.info("Loading finished.")
def simple_acquisition(filename):
task = daq.acquire(filename)
fname = task.filenames[0]
pattern = r"run([0-9]{4})"
match = re.search(pattern, fname)
run_number = int(match.groups()[0])
stand_client.add_row(run_number=run_number)