Files
alvra/alvra.py

274 lines
9.5 KiB
Python
Executable File

#!/usr/bin/env python
from slic.core.acquisition import SFAcquisition
from slic.core.adjustable import PVAdjustable
#from slic.core.acquisition import BSAcquisition, DBAcquisition, DIAAcquisition, PVAcquisition, SFAcquisition
#from slic.core.acquisition.bschannels import BSChannels
from slic.core.scanner import Scanner
from slic.core.acquisition import BSChannels, PVChannels, DetectorConfig
from slic.core.acquisition.fakeacquisition import FakeAcquisition
from slic.core.adjustable import DummyAdjustable
from slic.core.sensor import PVSensor
from slic.utils import Config, Elog, Screenshot, load_channels
from slic.core.condition import PVCondition
from slic.gui import GUI
from slic.core.acquisition.spreadsheet import Spreadsheet
from slic.utils import devices
from slic.utils import Marker, as_shortcut
from devices import *
from adhoc import *
#TODO: why do we need this suddenly?
try:
from IPython import get_ipython
get_ipython().Completer.use_jedi = False
except:
pass
#fn_cfg = "/sf/alvra/config/exp/current_experiment.json"
fn_cfg = "/sf/alvra/config/src/python/slic/alvra/config/config.json"
cfg = Config(fn_cfg)
elog = Elog(cfg.elog_url, cfg.screenshot_directory, user=cfg.user, password="supercorrect") #TODO: remove fake password
screenshot = Screenshot(cfg.screenshot_directory)
fn_channels = "/sf/alvra/config/com/channel_lists/default_channel_list"
fn_detectors = "/sf/alvra/config/com/detector_lists/default_detector_list"
fn_pvs = "/sf/alvra/config/com/channel_lists/PV_default_channel_list"
channels = BSChannels.from_file(fn_channels)
detectors = load_channels(fn_detectors)
pvs = PVChannels.from_file(fn_pvs)
## For Phil:
#fn_channels = "/sf/alvra/config/com/channel_lists/laser_channel_list_BS"
#fn_pvs = "/sf/alvra/config/com/channel_lists/laser_channel_list_PV"
#
#channels = BSChannels.from_file(fn_channels)
#detectors = None
#pvs = PVChannels.from_file(fn_pvs)
#detectors_raw = DetectorConfig("JF06T32V02", "JF02T09V03")
detectors_raw = DetectorConfig()
detectors_config = DetectorConfig()
#detectors_config.add("JF06T32V02", remove_raw_files=True)
#detectors_config.add("JF06T32V07", adc_to_energy=True, geometry=True, compression=True, double_pixels_action='mask', factor=7.1, remove_raw_files=True) ## this is for LaB6
detectors_config.add("JF06T32V07", adc_to_energy=True, compression=True, crystfel_lists_laser=True, double_pixels_action='mask', factor=8.0, remove_raw_files=True, save_dap_results=True) ## this is for CrystFEL
#detectors_config.add("JF06T32V07", adc_to_energy=True, geometry=True, compression=True, double_pixels_action='mask', factor=0.25, downsample=(2,2), remove_raw_files=True) ## this is for XSS
#detectors_config.add("JF06T08V06")
#detectors_config.add("JF06T08V07", adc_to_energy=True, geometry=True, compression=True, double_pixels_action='mask', factor=8.01, remove_raw_files=True) ## this is for LaB6
#detectors_config.add("JF06T08V07", adc_to_energy=True, compression=True, crystfel_lists_laser=True, double_pixels_action='mask', factor=8.01, remove_raw_files=True, save_dap_results=True) ## this is for CrystFEL
#detectors_config.add("JF02T09V03", disabled_modules=[0, 3, 4, 7, 8], remove_raw_files=True)
#detectors_config.add("JF02T09V03")
#detectors_config.add("JF02T09V03", adc_to_energy=True, geometry=True, roi={
#'LbUP1':[280, 330, 5800, 6600],
#'LbDW1':[160, 210, 5800, 6600],
#'LbUP2':[280, 360, 2600, 3400],
#'LbDW2':[140, 220, 2700, 3500],
#'LbUP1_bkg':[350, 400, 5800, 6600],
#'LbDW1_bkg':[90, 140, 5800, 6600],
#'LbUP2_bkg':[380, 460, 2600, 3400],
#'LbDW2_bkg':[40, 120, 2700, 3500]
#})
#detectors = None
detectors = detectors_config
#detectors = detectors_raw
#check_intensity = PVCondition("SARFE10-PBPG050:HAMP-INTENSITY-CAL", vmin=0, vmax=1500, wait_time=0.25, required_fraction=0.8)#
check_intensity = PVCondition("SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US", vmin=50, vmax=2000, wait_time=0.5, required_fraction=0.9)
#check_intensity = PVCondition("SARFE10-PBPS053:INTENSITY", vmin=0.05,vmax=10, wait_time=0.5, required_fraction=0.8)
#check_intensity = PVCondition("SAROP11-PBPS110:INTENSITY", vmin=0.05,vmax=20, wait_time=0.5, required_fraction=0.8)
#check_intensity = PVCondition("SAROP11-PBPS110:INTENSITY", vmin=0.1, vmax=10, wait_time=0.5, required_fraction=0.9)
#check_intensity = PVCondition("SAROP11-PBPS122:INTENSITY", vmin=0.1, vmax=10, wait_time=0.5, required_fraction=0.9)
#check_intensity = None
from datetime import datetime
class Date:
def __str__(self):
dt = datetime.now()
return str(dt).split()[0]
class Time:
def __str__(self):
dt = datetime.now()
return str(dt).split()[1]
spear_adjs = {
"Date": Date(),
"Time": Time(),
"Energy": mono_und,
"Laser Wavelength": 600,
"Sample": "Cu Ni",
"Edge": "Ni K",
"Concentration": "2 mM",
"Topas Delay": PVAdjustable("SLAAR01-LTIM-PDLY:SYS_MOVED"),
#"vOD Filter": laser.motion.vOD_filter,
}
spreadsheet = Spreadsheet(
host="saresa-vcons-01.psi.ch", port=9090, adjs=spear_adjs,
placeholders=("Laser Energy", 'Comment')
)
#daq = SFAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels, default_pvs=list(pvs), default_detectors=detectors, api_address='http://sf-daq-8:10002', append_user_tag_to_data_dir=True, rate_multiplicator=1, spreadsheet=spreadsheet) #, cell_file="lyso"
daq = SFAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels, default_pvs=list(pvs), default_detectors=detectors, append_user_tag_to_data_dir=True, rate_multiplicator=1, spreadsheet=spreadsheet) #, cell_file="lyso"
#daq = SFAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels, default_pvs=None, default_detectors=detectors, append_user_tag_to_data_dir=True, rate_multiplicator=1, spreadsheet=spreadsheet) #, cell_file="lyso"
daq.update_config_pvs()
#daq.client.address = 'http://sf-daq-8:10002'
scan = Scanner(
scan_info_dir="/sf/alvra/data/{}/res/scan_info".format(cfg.pgroup),
# scan_info_dir="./",
default_acquisitions=[daq],
condition=check_intensity,
default_sensor=None
)
gui = GUI(scan, show_spec=True, show_goto=True, show_run=True, show_sfx=True, extras={"Mono": "Special"})
sfx_gui = GUI(scan, show_static=False, show_scan=False, show_spec=False, show_scan2D=False, show_tweak=True, show_goto=True, show_run=False, show_sfx=True, title="SFX GUI")
pedestal_gui = GUI(scan, tabs={}, title="Pedestal GUI")
bsdaqJF = Deprecator("bsdaqJF", "daq")
scansJF = Deprecator("scansJF", "scan")
#daqBS = BSAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels)
#daqDB = DBAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels)
#daqPV = PVAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels)
#scanBS = Scanner(
## scan_info_dir="/sf/alvra/data/{}/res/scan_info".format(cfg.pgroup),
# scan_info_dir="./",
# default_acquisitions=[daqBS],
#)
class PVAdjustableWithGo(PVAdjustable):
def __init__(self, *args, pvname_go=None, **kwargs):
super().__init__(*args, **kwargs)
self.adj_go = None if pvname_go is None else PVAdjustable(pvname_go)
def set_target_value(self, value):
super().set_target_value(value)
if self.adj_go is not None:
self.adj_go.set_target_value(1).wait()
#att_ns = PVAdjustableWithGo("SLAAR03-PLAS-EKSPLA:HOLD_ATTN", pvname_go="SLAAR03-PLAS-EKSPLA:EXE_SET_ATTN.PROC", name="OPO transmission")
from CTAstuff import CCAcquisition, ccta
ccdaq = CCAcquisition(cfg.instrument, cfg.pgroup, default_channels=channels, default_pvs=list(pvs), default_detectors=detectors, append_user_tag_to_data_dir=True)
ccscan = Scanner(
scan_info_dir="/sf/alvra/data/{}/res/scan_info".format(cfg.pgroup),
default_acquisitions=[ccdaq],
condition=check_intensity,
)
ccgui = GUI(ccscan, show_spec=True, show_goto=True, show_run=True)
dummy = DummyAdjustable("A Dummy Adjustable")
def huber_pos(x, y, z, att=None):
print(f"x = {x}, y = {y}, z = {z}, att = {att}")
XrayShutter.close()
t1 = primeSample.x.set(x)
t2 = primeSample.y.set(y)
t3 = primeSample.z.set(z)
ts = [t1, t2, t3]
if att is not None:
t4 = attExp.set(att)
ts.append(t4)
for t in ts:
t.wait()
print("moving done")
XrayShutter.open()
print("done")
@as_shortcut
def Pscreen():
huber_pos(0, 0, -2.5)
@as_shortcut
def SiN():
huber_pos(13, -9, -1.1)
@as_shortcut
def ThinYAG():
huber_pos(16.6, -2.5, 0) #center hole
#huber_pos(15, 4, 0) #side hole
@as_shortcut
def FrostedYAG():
huber_pos(16.6, 10.5, 0)
@as_shortcut
def KnifeEdgeX():
huber_pos(-9.3, 9, 0, att=1e-5)
@as_shortcut
def KnifeEdgeY():
huber_pos(-6, 14, 0, att=1e-5)
#@as_shortcut
#def JetYAG():
# huber_pos(15.3, 2.2,0)
m1 = Marker(primeSample.x, name="Jet X")
m2 = Marker(primeSample.y, name="Jet Y")
m3 = Marker(primeSample.z, name="Jet Z")
m4 = Marker(primeSample.x, name="YAG Jet X")
m5 = Marker(primeSample.y, name="YAG Jet Y")
m6 = Marker(primeSample.z, name="YAG Jet Z")
m7 = Marker(primeSample.x, name="Solid Sample X")
m8 = Marker(primeSample.y, name="Solid Sample Y")
m9 = Marker(primeSample.z, name="Solid Sample Z")
t0_harmonics = None
t0_topas = None
@as_shortcut
def go_to_t0_harmonics():
if t0_harmonics is not None:
laser.pumpHarmonics_delay.motor.set(t0_harmonics)
else:
print("please define t0_harmonics")
@as_shortcut
def go_to_t0_topas():
if t0_topas is not None:
laser.pumpTopas_delay.motor.set(t0_topas)
else:
print("please define t0_topas")