Files
SwissMX/psi_device.py
2023-06-29 10:13:29 +02:00

169 lines
5.5 KiB
Python

import logging
from math import ceil
_log=logging.getLogger(__name__)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,format='%(name)s:%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
logging.getLogger('matplotlib').setLevel(logging.INFO)
import sys,os,socket
sys.path.insert(0, os.path.expanduser('..'))
hostname=socket.gethostname()
if hostname=='ganymede':
sys.path.insert(0, os.path.expanduser('~/Documents/prj/SwissFEL/PBTools/'))
else:
sys.path.insert(0, os.path.expanduser('/sf/cristallina/applications/mx/zamofing_t/PBTools/'))
sys.path.insert(0, os.path.expanduser('/sf/cristallina/applications/mx/slic'))
#("/photonics/home/gac-cristall/Documents/swissmx_cristallina/slic/"))
#from slic.core.acquisition import SFAcquisition
from PyQt5.QtWidgets import (QApplication,)
from app_config import AppCfg #settings, option, toggle_option
import epics
from pbtools.misc.pp_comm import PPComm
from pbtools.misc.gather import Gather
import shapepath
try:
from slic.core.acquisition import SFAcquisition
except ImportError as e:
_log.warning(e)
class Shutter:
def __init__(self,mode=1):
self._mode=mode
def open(self):
mode=self._mode
if mode==0:
_log.info('open simulated shutter')
elif mode==1:
# open laser shutter
# epics.caput("SLAAR02-LMOT-M262:MOT.VAL", 1)
# time.sleep(4)
# open fast shutter
epics.caput("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP", 1)
_log.info('shutter opened')
def close(self):
mode=self._mode
if mode==0:
_log.info('close simulated shutter')
elif mode==1:
# close fast shutter
epics.caput("SARES30-LTIM01-EVR0:RearUniv0-Ena-SP", 0)
# close laser shutter
# caput("SLAAR02-LMOT-M262:MOT.VAL", 40)
_log.info('shutter closed')
class Deltatau:
def __init__(self,sim=False):
app=QApplication.instance()
cfg=app._cfg
host=cfg.value(AppCfg.DT_HOST)
if sim:
self._comm=comm=None
self._gather=gather=None
else:
hpp=host.split(':')
param={'host':hpp[0]}
if len(hpp)>1:
param['port']=int(hpp[1])
if len(hpp)>2:
param['fast_gather_port']=int(hpp[2])
_log.info(' -> ssh-tunneling PPComm({host}:{port} {host}:{fast_gather_port})'.format(**param))
try:
self._comm=comm=PPComm(**param,timeout=2.0)
self._gather=gather=Gather(comm)
except (socket.timeout,socket.gaierror) as e:
_log.critical(f'can not connect to deltatau:"{host}" -> {e}')
self._shapepath=sp=shapepath.ShapePath(comm, gather, verbose=0xff, sync_mode=1, sync_flag=3)
class Jungfrau:
def __init__(self,sim=False):
#python /sf/jungfrau/applications/daq_client/daq_client.py -h
#python /sf/jungfrau/applications/daq_client/daq_client.py -p p19739 -t no_beam_test
# -c/sf/cristallina/config/channel_lists/channel_list_bs-e/sf/cristallina/config/channel_lists/channel_list_ca
# -f/sf/cristallina/config/jungfrau/jf_1d5M.json
# --start_pulseid 15382163895 --stop_pulseid 15382163905
#rsync -vai gac-cristall@saresc-cons-03:/sf/jungfrau/applications/daq_client/daq_client.py .
# setup slic parameters
if sim:
self._daq=None
return
app=QApplication.instance()
cfg=app._cfg
detectors = [ cfg.value(AppCfg.DAQ_DET) ]
bs_channels = cfg.value(AppCfg.DAQ_BS_CH)
pv_channels = cfg.value(AppCfg.DAQ_PV_CH)
loc=cfg.value(AppCfg.DAQ_LOC)
try:
self._daq=SFAcquisition(
loc['end_station'], loc['p_group'],
default_detectors=detectors, default_channels=bs_channels, default_pvs=pv_channels,
rate_multiplicator=1, append_user_tag_to_data_dir=True
)
self._pv_pulse_id=epics.PV('SAR-EXPMX-EVR0:RX-PULSEID')
self._pv_pulse_id.connect()
except NameError as e:
_log.critical(f'Jungfrau not connected: {e}')
def acquire(self, n_pulses, wait=False):
if self._daq is None:
_log.info(f'simulated')
return
app=QApplication.instance()
cfg=app._cfg
run=cfg.value(AppCfg.DAQ_RUN)
try:
self._pulse_id_start=int(self._pv_pulse_id.value)
except TypeError as e:
_log.warning(f'failed to get _pulse_id_start: {e}')
if self._daq is not None:
n_pulses_run = n_pulses + run['padding']
self._daq.acquire(run['prefix'], n_pulses=min(n_pulses_run, 5000), n_repeat=ceil(n_pulses_run/5000), wait=False, cell_name=run['cell_name'])
#self._daq.acquire(run['prefix'], n_pulses=n_pulses, wait=False)
pass
#run['id']+=1
cfg.setValue(AppCfg.DAQ_RUN,run)
def gather_upload(self):
if self._daq is None:
_log.info(f'simulated')
return
try:
self._pulse_id_end=int(self._pv_pulse_id.value)
except TypeError as e:
_log.warning(f'failed to get _pulse_id_start: {e}')
else:
_log.debug(f'pulse_id: {self._pulse_id_start}..{self._pulse_id_end}')
if __name__ == "__main__":
import os
import argparse
if hostname=='ganymede':
# use EPICS locally
# os.environ['EPICS_CA_ADDR_LIST']='localhost'
# use EPICS if connected to ESC network
os.environ['EPICS_CA_ADDR_LIST']='129.129.244.255 sf-saresc-cagw.psi.ch:5062 sf-saresc-cagw.psi.ch:5066'
parser = argparse.ArgumentParser()
parser.add_argument("--mode", "-m", help="qt test", type=lambda x:int(x, 0), default=0)
args = parser.parse_args()
_log.info('Arguments:{}'.format(args.__dict__))
app=QApplication(sys.argv)
app._cfg=cfg=AppCfg()
if args.mode &0x01:
dt=Deltatau()
if args.mode&0x02:
jf=Jungfrau()
jf.acquire(n_pulses=100, wait=True)
jf.gather_upload()