Files
cristallina/cristallina.py

168 lines
5.5 KiB
Python

#!/usr/bin/env python
import logging
from tracemalloc import start
logger = logging.getLogger("slic")
logger.setLevel(logging.INFO)
logger.info("Loading started.")
from time import sleep
from datetime import datetime
import numpy as np
import os
os.chdir('/sf/cristallina/applications/slic/cristallina')
# from tqdm import trange
from epics import PV
from slic.gui import GUI
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
from slic.core.acquisition import SFAcquisition, PVAcquisition
from slic.core.condition import PVCondition
from slic.core.scanner import Scanner
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
from slic.devices.general.shutter import Shutter
from slic.devices.xoptics.pulsepicker import PulsePicker
from slic.utils import devices, Marker, as_shortcut
from slic.utils import Channels, Config, Elog, Screenshot, PV
from slic.core.acquisition.fakeacquisition import FakeAcquisition
from slic.devices.timing.events import CTASequencer
from channels import detectors, channels, pvs
from spreadsheet import overview
from channels_minimal import detectors_min, channels_min, pvs_min
from pp_shutter import PP_Shutter
# DEVICES
## example devices and adjustables
from cool_motor import MyNewCoolThing
cool_motor = MyNewCoolThing("cool_motor")
dummy = DummyAdjustable(units="au")
## Attenuators
from slic.devices.xoptics.aramis_attenuator import Attenuator
from knife_edge import KnifeEdge
attenuator = Attenuator("SAROP31-OATA150", description="Cristallina attenuator OATA150")
front_end_attenuator = Attenuator("SARFE10-OATT053", description="Front end attenuator OATT053")
cta = CTASequencer("SAR-CCTA-ESC")
def test_attenuator():
tfundamental = attenuator.get_transmission()
try:
assert tfundamental > 0
except TypeError:
print("No transmission value reported from {attenuator.ID}")
#test_attenuator()
## Undulator
import undulator
undulators = undulator.Undulators()
# Shutter
shutter = PP_Shutter("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP",name="Cristallina pulse picker shutter") # Shutter buttton when using the pulse picker
pulsepicker = PulsePicker("SAROP31-OPPI151","SARES30-LTIM01-EVR0:Pul3", name="Cristallina X-ray pulse picker OPPI151")
## Slits
from slic.devices.xoptics import slits
## Smaract & attocube stages
from smaract_device_def import smaract
from attocube_device_def import attocube
# KBs
from slic.devices.xoptics.kb import KBBase,KBHor,KBVer
kbHor = KBHor(
"SAROP31-OKBH154",
description="Cristallina horizontal KB mirror"
)
kbVer = KBVer(
"SAROP31-OKBV153",
description="Cristallina vertical KB mirror"
)
###########################################
instrument = "cristallina"
# pgroup = "p19739" # commissioning March 2022 -- July 2022
# pgroup = "p20443" # commissioning Wavefront Sensor August 2022 (active)
# pgroup = "p20558" # SwissMX commissioning 3
# pgroup = "p20557" # CrQ PMS commisioning 1
# pgroup = "p20509" # CrQ commissoing DilSc1
# pgroup = "p20519" # beamline commissioning 2
pgroup = "p20841" # CrQ PMS commisioning 2 (Jan 2023)
# pgroup = "p19150" # Scratch
# pgroup = "p19152" # Scratch
daq = SFAcquisition(
instrument,
pgroup,
default_channels=channels ,
default_pvs=pvs,
default_detectors=detectors,
rate_multiplicator=1,
)
# daq = FakeAcquisition(instrument, pgroup)
# daqPV = PVAcquisition(instrument, pgroup, default_channels=channels_ks) # workaround for KS not going to DB
# scan = Scanner(scan_info_dir=f"/sf/{instrument}/data/{pgroup}/res/scan_info", default_acquisitions=[daq], condition=None)
# Run the scan only when gas monitor value larger than 10uJ (and smaller than 2000uJ):
check_intensity_gas_monitor = PVCondition("SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US", vmin=10, vmax=2000, wait_time=0.5, required_fraction=0.8) # required fraction defines ammount of data recorded to save the step and move on to the next one
scan = Scanner(default_acquisitions=[daq],condition = check_intensity_gas_monitor)
gui = GUI(scan, show_goto=True, show_spec=True)
logger.info(f"Running at {instrument} with pgroup {pgroup}.")
logger.info("Loading finished.")
# For PMS pulsing. Temporarily placed here, should be a method of the pulsed magnet device once created.
import sys
sys.path.append('/sf/jungfrau/applications/daq_client')
import daq_client as sf_daq_client
CTA_sequence_start_PID = PV("SAR-CCTA-ESC:seq0Ctrl-StartedAt-O")
# Number of pulses to ask from the SFDAQ. This will depend on the CTA sequence.
npulses=10
def pulse(run_comment=None):
# Start the CTA sequence
cta.cta_client.start()
# Wait until done
sleep(3)
# Ask for the first pid in the sequence
start_PID=int(CTA_sequence_start_PID.get())
# Data retrieval does not allow empty list, so creating a new variable
if detectors == [] or detectors == None:
used_detectors = {}
else:
used_detectors = detectors
# Ask the DAQ client to download the data
run_number = sf_daq_client.retrieve_data_from_buffer(pgroup=pgroup, user_tag=run_comment,
camera_channels=[], bsread_channels=channels, epics_channels=pvs,
detectors=used_detectors,
start_pulseid=start_PID, stop_pulseid=start_PID+npulses,
rate_multiplicator=1,
)
print('\n')
print(f' Magnet pulse done.\n Run {run_number} created. \n First PID in the CTA sequence: {start_PID}\n')