Files
bdbase/base.py

2248 lines
87 KiB
Python

""" Base accelerator module
"""
import argparse
from collections import OrderedDict
from datetime import datetime
import getpass
import h5py
import inspect
import logging
import numpy as np
import platform
import os
import re
import sys
import time
# Third-party modules
from qtpy.QtCore import (PYQT_VERSION_STR, QEventLoop, QFile, QIODevice,
QSettings, QSize, Qt, QThread, QTimer, Signal, Slot)
from qtpy.QtCore import __version__ as QT_VERSION_STR
from qtpy.QtGui import (QColor, QKeySequence, QIcon, QPainter, QPalette,
QPixmap)
from qtpy.QtPrintSupport import QPrinter, QPrintDialog
from qtpy.QtWidgets import (QAction, QApplication, QDialog, QFrame, QLabel,
QMainWindow, QMessageBox, QPlainTextEdit,
QProgressBar, QScrollArea, QSizePolicy,
QSplashScreen, QVBoxLayout, QWidget)
from pyqtacc.bdbase.utils import _line
from pyqtacc.bdbase.enumkind import Facility, MsgSeverity, UserMode
from pyqtacc.bdbase.helpbrowser import HelpBrowser
from pyqtacc.bdbase.readjson import ReadJSON
from pyqtacc.bdbase.savehdf import QSaveHDF
from pyqtacc.bdbase.hdf5filemenu import HDF5GroupBox
from pyqtacc.bdbase.sendelog import QSendToELOG
from pyqtacc.bdbase.screenshot import QScreenshot
from pyqtacc.bdbase.guiframe import GUIFrame
from caqtwidgets.pvwidgets import QNoDockWidget
import PyCafe
_abspath = os.path.dirname(os.path.abspath(__file__))
_pymodule = os.path.basename(__file__)
_appname, _appext = _pymodule.split(".")
_appversion = "1.0.0"
_author = "J. Chrin"
PROGRESS_THREAD_INIT = 0
PROGRESS_THREAD_START = 1
PROGRESS_THREAD_ABORTING = 2
PROGRESS_THREAD_ABORTED = 3
PROGRESS_THREAD_ERROR = 4
PROGRESS_THREAD_END = 100
CENTRAL_WIDGET_MINIMUM_HEIGHT = 840
CENTRAL_WIDGET_MINIMUM_WIDTH = 1240
SLS_CENTRAL_WIDGET_MINIMUM_HEIGHT = 840
SLS_CENTRAL_WIDGET_MINIMUM_WIDTH = 940
class BaseWindow(QMainWindow):
""" BaseWindow
"""
####trigger_elog_entry = Signal(bool, str, str)
trigger_log_message = Signal(str, str, int, str, dict)
trigger_progressbar = Signal(int)
trigger_progressbar_str = Signal(int, str)
##trigger_hdf_save = Signal()
class SaveFigureThread(QThread):
def __init__(self, parent, folder_name, time_in_seconds,
reanalysis_time):
QThread.__init__(self)
self.parent = parent
self.settings = self.parent.settings
self.folder_name = folder_name
self.time_in_seconds = time_in_seconds
self.reanalysis_time = reanalysis_time
self.all_data = self.parent.all_data
self.all_data_2 = self.parent.all_data_2
#Causes QThread::wait: Thread tried to wait on itself
#def __del__(self):
# self.wait()
def run(self):
attach_files = []
folder_name = self.folder_name
date_str = self.parent.add_date_to_path(
time_in_seconds=self.time_in_seconds,
reanalysis_time_in_seconds=self.reanalysis_time)
def extract_and_attach(i, nfig, name, all_fig_data):
canvas = 'Canvas {0}'.format(i+1)
name_base = name.replace(' ', '_').lower()
write_message_fired = False
if all_fig_data[canvas] is not None:
nfig_canvas = len(all_fig_data[canvas])
nfig_canvas = min(nfig_canvas, nfig)
else:
nfig_canvas = nfig
for idx in range(0, nfig_canvas):
if all_fig_data[canvas] is not None:
name = name_base + "_{0}".format(
idx) if idx > 0 else name_base
save_dest = (folder_name + date_str + '_' + name +
'.png')
_dirname = os.path.dirname(save_dest)
#print("DIRECTORY NAME IS ", _dirname)
#print("exists?", os.path.exists(_dirname),
#os.path.exists(save_dest))
if not os.path.isfile(save_dest):
if all_fig_data[canvas][idx] is not None:
if not os.path.exists(_dirname):
os.makedirs(_dirname)
if os.access(_dirname, os.W_OK):
all_fig_data[canvas][idx].savefig(save_dest)
elif not write_message_fired:
_mess = ("Do not have write permission " +
"for directory {0} from this " +
"host {1}. Images not saved and " +
"cannot be sent to elog").format(
_dirname, os.uname()[1])
self.parent.trigger_log_message.emit(
MsgSeverity.WARN.name, _pymodule,
_line(), _mess, {})
write_message_fired = True
if not write_message_fired:
attach_files.append(save_dest)
try:
resultsSeq = self.settings.data["GUI"]["resultsSeq"]
titleSeq = self.settings.data["GUI"]["subResultsTabTitle"]
if self.all_data:
fig_data = self.all_data['Figure data']
for i, (nfig, name) in enumerate(zip(resultsSeq, titleSeq)):
print(i, nfig, name, flush=True)
print(fig_data, flush=True)
extract_and_attach(i, nfig, name, fig_data)
except KeyError as ex:
pass
try:
resultsSeq = self.settings.data["GUI2"]["resultsSeq"]
titleSeq = self.settings.data["GUI2"]["subResultsTabTitle"]
if self.all_data_2:
fig_data = self.all_data_2['Figure data']
for i, (nfig, name) in enumerate(zip(resultsSeq, titleSeq)):
extract_and_attach(i, nfig, name, fig_data)
except KeyError as ex:
pass
#Not so nice.. send a signal instead?
if attach_files:
self.parent.attach_files = attach_files
print(attach_files, flush=True)
print("All files attached", flush=True)
else:
print("No files to attach", flush=True)
time.sleep(0.1) #avoid race condition
class HDFSave(QThread):
"""Thread for hdf analysis
"""
def __init__(self, parent, from_dialog):
QThread.__init__(self)
self.parent = parent
self.from_dialog=from_dialog
#Only a precaution, not experienced
#Causes QThread::wait: Thread tried to wait on itself
#def __del__(self):
# self.wait()
def run(self):
"""Run hdf thread
"""
QApplication.processEvents(QEventLoop.ExcludeUserInputEvents, 5000)
self.all_data = self.parent.all_data
#Reanalysis data
if self.all_data is not None:
try:
if 'Time in seconds' in self.all_data['Ambient data']:
ts_in_seconds = self.all_data['Ambient data'][
'Time in seconds']
else:
ts_in_seconds = None
except KeyError:
ts_in_seconds = None
now_in_seconds = None
from_hdf = False
try:
if 'Reanalysis time in seconds' in self.all_data[
'Processed data']:
from_hdf = bool(
self.all_data['Processed data']['Reanalysis time'])
if from_hdf:
now_in_seconds = self.all_data['Processed data'][
'Reanalysis time in seconds']
#Double check
if not from_hdf:
if 'from_hdf' in self.all_data['Processed data']:
from_hdf = bool(self.all_data['Processed data'][
'from_hdf'])
except KeyError:
now_in_seconds = None
self.parent.from_hdf = from_hdf
print("t=========================>", ts_in_seconds, " // ", now_in_seconds)
print("from hdf5=========================>", from_hdf)
if self.parent.hdf_filename is None or not self.from_dialog:
self.parent.set_new_hdf_filename(ts_in_seconds,
now_in_seconds)
try:
print("FILENAME ==", self.parent.hdf_filename, flush=True)
with h5py.File(self.parent.hdf_filename, 'w') as dataH5:
#experiment
if not from_hdf:
self.parent.add_pvs_to_hdf(
dataH5, pv_list=self.parent.pv_machine_list,
from_hdf=from_hdf)
#general
#if not from_hdf:
self.parent.add_general_to_hdf(dataH5)
self.parent.add_to_hdf(dataH5, proc=True, raw=True)
self.parent.hdf_save_completed = True
_mess = "Processed data saved to {}".format(
self.parent.hdf_filename)
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, _line(), _mess,
{})
except OSError as e:
_mess = "OSError in saving to file {0}: \n{1}".format(
self.parent.hdf_filename, str(e))
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, _line(), _mess, {})
self.parent.hdf_save_completed = False
class HDFThread(QThread):
"""Thread for hdf analysis
"""
trigger_thread_event = Signal(dict)
def __init__(self, parent, analysis_procedure, all_data):
QThread.__init__(self)
self.parent = parent
self.analysis_procedure = analysis_procedure
self.all_data = all_data
self.hdf_filename_loaded = self.parent.hdf_filename_loaded
#Only a precaution, not experienced
#Causes QThread::wait: Thread tried to wait on itself
#def __del__(self):
# self.wait()
def run(self):
"""Run hdf thread
"""
if not hasattr(self.analysis_procedure, 'load_hdf_file'):
mess = ("Analysis not configured for HDF analysis! " +
"Missing method: load_hdf_file")
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name,_pymodule, _line(), mess, {})
self.parent.trigger_progressbar.emit(PROGRESS_THREAD_END)
return
self.all_data = self.analysis_procedure.load_hdf_file(
self.hdf_filename_loaded)
if not self.all_data:
self.parent.trigger_progressbar.emit(PROGRESS_THREAD_END)
return
if not hasattr(self.analysis_procedure, 'reanalyze'):
mess = ("Analysis not configured for HDF analysis! " +
"Missing method: reanalyze")
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, _line(), mess, {})
self.parent.trigger_progressbar.emit(PROGRESS_THREAD_END)
return
try:
expt_dict = self.all_data['experiment']
except KeyError:
expt_dict = None
pass
#Open hdf5file here and
mess = "HDF file {} analysis proceeding...".format(
self.hdf_filename_loaded)
self.parent.trigger_log_message.emit(MsgSeverity.INFO.name,
_pymodule, _line(),
mess, {})
all_dict = self.analysis_procedure.reanalyze(self.all_data)
if expt_dict is not None:
all_dict['experiment'] = expt_dict
# Emit results
if all_dict is not None:
self.parent.from_hdf = True
self.trigger_thread_event.emit(all_dict)
mess = "HDF file {} analysis succeeded".format(
self.hdf_filename_loaded)
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, _line(), mess, {})
else:
mess = "HDF file {} has wrong content!!".format(
self.hdf_filename_loaded)
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, _line(), mess, {})
class AnalysisThread(QThread):
"""Analysis thread
"""
trigger_thread_event = Signal(dict)
def __init__(self, parent, analysis_procedure, input_parameters,
messages: dict={
"success": "Analysis completed", "fail":
"No data returned from analysis procedure"}):
QThread.__init__(self)
self.parent = parent
self.analysis_procedure = analysis_procedure
self.input_parameters = input_parameters
self.messages = messages
try:
if input_parameters['debug']:
print("AnalysisThread", self.input_parameters, flush=True)
except KeyError:
pass
def __del__(self):
self.wait()
def run(self):
"""Run thread
"""
print("RUN IN BASE CLASS", flush=True)
all_dict = self.analysis_procedure.measure_and_analyze(
self.input_parameters)
# Emit results
if all_dict:
self.trigger_thread_event.emit(all_dict)
mess = self.messages['success']
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, _line(), mess, {})
else:
mess = self.messages['fail']
self.parent.trigger_log_message.emit(
MsgSeverity.WARN.name, _pymodule, _line(), mess, {})
def __init__(self, parent=None, pymodule=None, appversion=None, title="",
user_mode=UserMode.OPERATION, facility=Facility.SwissFEL,
extended=True, has_optics=True, has_procedure=True):
super(BaseWindow, self).__init__(parent)
self.parent = parent
self.has_optics = has_optics
self.pymodule = pymodule if pymodule else _pymodule
self.appversion = appversion if appversion else _appversion
self.source_file = None
self.author = _author
self.appversion = _appversion
self.title = title
self.facility = facility
self.user_mode = user_mode
self.appname, self.appext = self.pymodule.split(".")
print("=============================================")
print("Starting {0} at: {1}".format(self.appname, datetime.now()))
print("User: {0} Host: {1}".format(os.getlogin(), os.uname()[1]))
print("=============================================", flush=True)
self.settings = ReadJSON(self.appname)
#Read out current_logbook
self.cafe = PyCafe.CyCafe()
self.cyca = PyCafe.CyCa()
self.cafe_exception = PyCafe.CafeException
self.cafe.enableExceptions = False
self.caget = self.cafe.caget
self.caput = self.cafe.caput
self.application_recent_files = []
self.application_geometry = None
self.screenshot_titles = []
self.screenshot_items = []
self.filename = None
self.hdf_filename_loaded = "NONE" #For loading into hdf dockwidget
self.hdf_filename = None #For saving
self.hdf_dialog = None
self.from_hdf = False
self.daq_analysis_completed = False
self.setObjectName("MainWindow")
self.setWindowTitle(self.appname)
self.statusbar_label = QLabel()
self.statusbar = self.statusBar()
self.progressbar = QProgressBar(self)
self.menu = self.menuBar()
self.elog_dest = self.settings.data["Elog"]["destination"]
try:
self.elog_add_date_to_dir = self.settings.data["Elog"]["addDateToDir"]
except KeyError:
self.elog_add_date_to_dir = True
self.screenshot_dest = self.settings.data["screenshot"]["destination"]
self.stdlog_dest = (self.settings.data["stdlog"]["destination"] +
self.appname + "-" + os.getlogin() + ".log")
self.logging = logging
#self.logging.basicConfig(filename=self.stdlog_dest, level=logging.DEBUG)
self.logging.basicConfig(level=logging.NOTSET)
self.logger = self.logging.getLogger(__name__)
self.logger.info("Logging activated")
self.date_str = None
self.autopost_elog = True
try:
self.hdf_dest = self.settings.data["hdf"]["destination"]
except KeyError:
self.hdf_dest = None
try:
self.pv_machine_list = self.settings.data["hdf"]["experiment"]
#pv_machine_dict = self.cafe.getDictionary(self.pv_machine_list)[0]
except KeyError:
self.pv_machine_list = None
try:
self.autopost_epics = self.settings.data["menuFlags"]["hasEpics"]
except KeyError as error:
print("KeyError in base.py, init:", error)
self.autopost_epics = False
try:
self.autopost_hdf = self.settings.data["menuFlags"]["hasH5"]
except KeyError as error:
print("KeyError in base.py, init:", error)
self.autopost_hdf = False
self.hdf_save_completed = False if self.autopost_hdf else True
self.all_input_parameters = {} #gui
self.all_input_labels = {} #gui
self.all_expert_parameters = {} #gui
self.all_expert_labels = {} #gui
self.input_parameters = {} #analysis thread
self.input_labels = {}
#self.input_value_dict = {}
self.expert_parameters = {} #added to input_parameters
self.expert_labels = {}
self.read_input_parameters()
self.all_data = {}
self.all_data_2 = {}
self.hdf_thread = None
self.save_hdf_thread = None
self.analysis_thread = None
self.analysis_procedure = None
try:
from src.analysis import AnalysisProcedure
self.analysis_procedure = AnalysisProcedure(self)
print("Base class has user supplied AnalysisProcedure class.",
flush=True)
except ImportError as e:
print(("Base class without user supplied AnalysisProcedure class."
+ " import Error:"), e, flush=True)
##self.trigger_elog_entry.connect(self.receive_elog_notification)
##self.trigger_hdf_save.connect(self.save_to_hdf)
self.msg_severity_qcolor = {
MsgSeverity.FATAL: QColor(
self.settings.data["MsgSeverity"]["fatal"]),
MsgSeverity.ERROR: QColor(
self.settings.data["MsgSeverity"]["error"]),
MsgSeverity.WARN: QColor(
self.settings.data["MsgSeverity"]["warn"]),
MsgSeverity.WARNING: QColor(
self.settings.data["MsgSeverity"]["warn"]),
MsgSeverity.INFO: QColor(
self.settings.data["MsgSeverity"]["info"]),
MsgSeverity.DEBUG: QColor(
self.settings.data["MsgSeverity"]["debug"]),
MsgSeverity.FATAL.name: QColor(
self.settings.data["MsgSeverity"]["fatal"]),
MsgSeverity.ERROR.name: QColor(
self.settings.data["MsgSeverity"]["error"]),
MsgSeverity.WARN.name: QColor(
self.settings.data["MsgSeverity"]["warn"]),
MsgSeverity.WARNING.name: QColor(
self.settings.data["MsgSeverity"]["warn"]),
MsgSeverity.INFO.name: QColor(
self.settings.data["MsgSeverity"]["info"]),
MsgSeverity.DEBUG.name: QColor(
self.settings.data["MsgSeverity"]["debug"])
}
#QSetttings
self.restore_application_settings()
self.init_toolbar()
self.init_statusbar_wgt()
self.init_progressbar_wgt()
self.trigger_log_message.connect(self.receive_log_message)
self.trigger_progressbar.connect(self.receive_progressbar)
self.trigger_progressbar_str.connect(self.receive_progressbar)
self.statusbar.addPermanentWidget(self.progressbar, 0)
self.appname = self.pymodule.split(".")[0]
self.mainwindow = QWidget()
self.mainwindow_layout = QVBoxLayout()
if self.facility == Facility.SwissFEL:
from pyqtacc.sf.guiheader import GUIHeader
from pyqtacc.bdbase.sendelog import QSendToELOG
elif self.facility == Facility.SLS:
from pyqtacc.sls.guiheader import GUIHeader
from pyqtacc.sls.sendelogsls import QSendToELOG
elif self.facility == Facility.HIPA:
from pyqtacc.hipa.guiheader import GUIHeader
from pyqtacc.hipa.sendeloghipa import QSendToELOG
elif self.facility == Facility.PROSCAN:
from pyqtacc.proscan.guiheader import GUIHeader
from pyqtacc.proscan.sendelogproscan import QSendToELOG
elif self.facility == Facility.ESS:
from pyqtacc.ess.guiheader import GUIHeader
from pyqtacc.ess.sendelogess import QSendToELOG
self.gui_header = GUIHeader(self, user_mode=self.user_mode,
extended=extended)
self.mainwindow_layout.addWidget(self.gui_header.header_wgt)
self.gui_frame = GUIFrame(self, has_optics=self.has_optics,
has_procedure=has_procedure)
self.show_log_message = self.gui_frame.show_log_message
if self.autopost_hdf:
self.hdf_dock_widget = QNoDockWidget(" HDF5", self)
self.init_hdf_analysis_wgt()
self.mainwindow_layout.addWidget(self.gui_frame.central_tab_widget)
self.mainwindow.setLayout(self.mainwindow_layout)
if self.facility == Facility.SwissFEL:
self.mainwindow.setMinimumHeight(CENTRAL_WIDGET_MINIMUM_HEIGHT)
self.mainwindow.setMinimumWidth(CENTRAL_WIDGET_MINIMUM_WIDTH)
else:
self.mainwindow.setMinimumHeight(SLS_CENTRAL_WIDGET_MINIMUM_HEIGHT)
self.mainwindow.setMinimumWidth(SLS_CENTRAL_WIDGET_MINIMUM_WIDTH)
self.setCentralWidget(self.mainwindow)
self.show_log_message(MsgSeverity.INFO.name, _pymodule, _line(),
"Application configured")
def read_input_parameters(self):
for key, dictval in self.settings.data["Parameters"].items():
if not isinstance(dictval, dict):
continue
if "value" in dictval["data"]:
if self.settings.data["Parameters"][key]["flag"]:
self.input_parameters[key] = dictval["data"]["value"]
self.all_input_parameters[key] = dictval["data"]["value"]
else:
try:
link_list = []
if 'link' in dictval["data"]:
link_list = dictval["data"]["link"]
#print("link_list", link_list)
if len(link_list) > 1:
val = self.settings.data[link_list[0]]
for link in link_list[1:]:
val = val[link]
#print("lval", val)
else:
val = []
if isinstance(link_list[0], dict):
for inner_key in self.settings.data[
link_list[0]].keys():
val.append(inner_list)
else:
for lvalue in self.settings.data[link_list[0]]:
val.append(lvalue)
if self.settings.data["Parameters"][key]["flag"]:
self.input_parameters[key] = val
self.all_input_parameters[key] = val
#print("val=",val, key)
except KeyError as e:
print("Key Error in base.py initialization:", e)
if self.settings.data["Parameters"][key]["flag"]:
self.input_labels[key] = dictval["data"]["text"]
self.all_input_labels[key] = dictval["data"]["text"]
#print(key, self.settings.data["Parameters"][key]["flag"])
if "Expert" not in self.settings.data:
return
for key, dictval in self.settings.data["Expert"].items():
if not isinstance(dictval, dict):
continue
if "value" in dictval["data"]:
if self.settings.data["Expert"][key]["flag"]:
self.expert_parameters[key] = dictval["data"]["value"]
self.all_expert_parameters[key] = dictval["data"]["value"]
else:
try:
link_list = []
if 'link' in dictval["data"]:
link_list = dictval["data"]["link"]
#print("link_list", link_list)
if len(link_list) > 1:
val = self.settings.data[link_list[0]]
for link in link_list[1:]:
val = val[link]
#print("lval", val)
else:
val = []
if isinstance(link_list[0], dict):
for inner_key in self.settings.data[
link_list[0]].keys():
val.append(inner_list)
else:
for lvalue in self.settings.data[link_list[0]]:
val.append(lvalue)
if self.settings.data["Expert"][key]["flag"]:
self.expert_parameters[key] = val
self.all_expert_parameters[key] = val
#print("val=",val, key)
except KeyError as e:
print("Key Error in base.py initialization:", e)
if self.settings.data["Expert"][key]["flag"]:
self.expert_labels[key] = dictval["data"]["text"]
self.all_expert_labels[key] = dictval["data"]["text"]
def init_toolbar(self):
""" Prepare toolbar
"""
#menu / toolbar
font = self.menu.font()
font.setPointSize(12)
self.menu.setFont(font)
menu_file = self.menu.addMenu("&Output")
menu_file.setFont(font)
menu_file_actions = []
menu_view = self.menu.addMenu("&View")
menu_view.setFont(font)
menu_view_actions = []
menu_help = self.menu.addMenu("&Help")
menu_help.setFont(font)
menu_help_actions = []
file_toolbar = self.addToolBar("Output")
file_toolbar.setObjectName("FileToolBar")
view_toolbar = self.addToolBar("View")
view_toolbar.setObjectName("ViewToolBar")
# To implement
#menu_daq = self.menu.addMenu("&DAQ")
#menu_daq_actions = []
#daq_toolbar = self.addToolBar("DAQ")
#daq_toolbar.setObjectName("ViewToolBar")
help_toolbar = self.addToolBar("Help")
help_toolbar.setObjectName("HelpToolBar")
if self.autopost_epics:
epics_action = self.create_action(
"&EPICS", slot=self.save_to_epics, shortcut="Ctrl+Shift+E",
icon="epics", tip="Write to EPICS PVs")
menu_file_actions.append(epics_action)
if self.autopost_hdf:
hdf_action = self.create_action(
"Save&HDF", slot=self.save_to_hdf_dialog, shortcut="Ctrl+H",
icon="hdf", tip="Save as HDF")
menu_file_actions.append(hdf_action)
elog_action = self.create_action(
"&Elog", slot=self.send_to_elog, shortcut="Alt+E", icon="elog",
tip="Send to ELOG")
menu_file_actions.append(elog_action)
menu_file_actions.append(QAction().setSeparator(True))
screenshot_action = self.create_action(
"&Screenshot", slot=self.take_screenshot, shortcut="Alt+S",
icon="screenshot", tip="Screenshot of selected items")
menu_file_actions.append(screenshot_action)
print_action = self.create_action(
"&Print", slot=self.send_to_printer, shortcut=QKeySequence.Print,
icon="fileprint", tip="Send to printer")
menu_file_actions.append(print_action)
stdout_action = self.create_action(
"stdout &Log", slot=self.open_stdout_log, shortcut="Alt+L",
icon="filestdlog", tip="Open stdout log file")
menu_view_actions.append(stdout_action)
clear_log_action = self.create_action(
"&Clear log window", slot=self.clear_log_window, shortcut="Alt+C",
icon="viewclearlog", tip="Clears message log window")
menu_view_actions.append(clear_log_action)
quit_action = self.create_action(
"&Quit", slot=self.close, shortcut="Ctrl+Q", icon="filequit",
tip="Exit application, closing all windows")
menu_file_actions.append(QAction().setSeparator(True))
menu_file_actions.append(quit_action)
help_action = self.create_action(
"Help", slot=self.show_help, shortcut=QKeySequence.HelpContents,
icon="home", tip="Help!")
menu_help_actions.append(help_action)
about_action = self.create_action(
"About", slot=self.show_about, shortcut=QKeySequence.WhatsThis,
icon="helpabout", tip="About this app")
menu_help_actions.append(about_action)
self.add_actions(menu_file, tuple(menu_file_actions))
self.add_actions(menu_view, tuple(menu_view_actions))
self.add_actions(menu_help, tuple(menu_help_actions))
self.add_actions(file_toolbar, tuple(menu_file_actions[:-1]))
self.add_actions(view_toolbar, tuple(menu_view_actions))
self.add_actions(help_toolbar, tuple(menu_help_actions))
save_all_action = self.create_action(
"Save&ALL", slot=self.write_to_destinations,
shortcut="Ctrl+Z", icon="save_all", tip="Save All")
#toolbar
# Place ExitWidget on the far right
exit_toolbar = self.addToolBar("EXIT")
exit_toolbar.setObjectName("ExitToolBar")
spacer_wgt = QWidget()
spacer_wgt.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
spacer_wgt.setVisible(True)
exit_toolbar.addWidget(spacer_wgt)
#_qsize = exit_toolbar.iconSize()
#print("qsize", _qsize)
true_list = [self.autopost_epics, self.autopost_hdf, self.autopost_elog]
if true_list.count(True) > 1:
exit_toolbar.setIconSize(QSize(true_list.count(True)*30+10, 24))
#exit_toolbar.setIconSize(_qsize)
self.add_actions(exit_toolbar, (
save_all_action, "Space", None, "Space", quit_action))
else:
self.add_actions(exit_toolbar, (quit_action,))
def combine_save_icons(self): #epics=False, hdf=False, elog=False):
epics = self.autopost_epics
hdf = self.autopost_hdf
elog = self.autopost_elog
combinedIcon = QIcon()
startx1 = 0
startx2 = 0
startx3 = 0
true_list = [epics, hdf, elog]
if true_list.count(True) == 1:
startx1 = 0
startx2 = 0
startx3 = 0
elif true_list.count(True) == 3:
startx1 = 20
startx2 = 150
startx3 = 300
else:
if epics:
startx1 = 20
startx2 = 150
startx3 = 150
elif hdf:
startx2 = 20
startx3 = 150
comboPixmap = QPixmap(130*true_list.count(True), 100) #420, 100
#comboPixmap.fill(Qt.transparent)
comboPixmap.fill(QColor("#dcdcdc")) #"#bcc6cc"
epicsImage = QPixmap(":/epics.png")
hdfImage = QPixmap(":/hdfS.png")
elogImage = QPixmap(":/elog.png")
painter = QPainter(comboPixmap)
if epics:
painter.drawPixmap(startx1, 0, epicsImage, 0, -20, 100, 160)
if hdf:
painter.drawPixmap(startx2, 0, hdfImage, 0, -20, 100, 160)
if elog:
painter.drawPixmap(startx3, 0, elogImage)
combinedIcon.addPixmap(comboPixmap)
painter.end()
return combinedIcon
################# Actions
def add_actions(self, target, actions):
""" Add to toolbar
"""
for action in actions:
if action == "Space":
qf = QFrame()
qf.setFixedWidth(6)
target.addWidget(qf)
elif action is None:
target.addSeparator()
else:
target.addAction(action)
def create_action(self, text, slot=None, shortcut=None, icon=None, tip=None,
checkable=False):# signal="triggered()"):
""" Helper fn
"""
action = QAction(text, self)
if icon is not None:
if icon == "save_all":
action.setIcon(QIcon(self.combine_save_icons()))
action.setIconText("Save All")
else:
action.setIcon(QIcon(":/{0}.png".format(icon)))
if shortcut is not None:
action.setShortcut(shortcut)
if tip is not None:
action.setToolTip(tip)
if slot is not None:
action.triggered.connect(slot)
if checkable:
action.setCheckable(True)
return action
def verify_close_event(self):
""" Overrides QMainWindow method
"""
if self.analysis_thread is not None:
if self.analysis_thread.isRunning():
_mess = ("Measurement in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "Exit", _mess,
QMessageBox.Ok)
return False
if self.hdf_thread:
if self.hdf_thread.isRunning():
_mess = ("HDF analysis in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "Exit",
_mess, QMessageBox.Ok)
return False
if self.daq_analysis_completed and not self.hdf_save_completed \
and not self.from_hdf:
if self.all_data is not None:
try:
if 'Reanalysis time' in self.all_data['Processed data']:
if not self.all_data['Processed data']['Reanalysis time']:
_mess = ("Are you sure you wish to exit " +
"without saving data to HDF?")
qm = QMessageBox()
reply = qm.warning(self, "Exit", _mess,
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.No:
return False
except KeyError:
pass
return True
@Slot()
def closeEvent(self, event):
""" Overrides QMainWindow method
"""
#Close all dock widgets
#self.removeDockWidget(self.hdf_dock_widget)
self.logger.info("Closing Application")
print("Closing Application", flush=True)
self.save_application_settings()
QApplication.processEvents()
#print( ("Stopping Monitors. This may on occasion lead to " +
# "NO CYTHON CALLBACK MATCH FOUND notices"), flush=True)
self.cafe.monitorStopAll()
time.sleep(0.05)
self.cafe.terminate()
time.sleep(0.05)
QApplication.closeAllWindows()
event.accept()
@Slot()
def clear_log_window(self):
""" Requests user confirmation before wiping out log message
window
"""
reply = QMessageBox.question(
self, "Clear Message Log",
"Do you wish to wipe out all messages from the log window?",
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.No:
return False
elif reply == QMessageBox.Yes:
return self.gui_frame.log_wgt.clear()
@Slot()
def open_stdout_log(self):
""" Slot to stdout_action
"""
log_title = self.appname + ": stdout"
sys.stdout.flush()
file = None
if QFile.exists(self.stdlog_dest):
file = open(self.stdlog_dest)
else:
message = "Error Log File: '{0}' does not exist!".format(
self.stdlog_dest)
QMessageBox.warning(self, log_title, message)
return
text = file.read()
file.close()
text_edit = QPlainTextEdit()
text_edit.setPlainText(text)
text_edit.setReadOnly(True)
bgcolor = ("QPlainTextEdit{background-color:" +
self.settings.data["StyleGuide"]["bgErrorLogFile"] + "}")
text_edit.setStyleSheet(bgcolor)
text_edit.verticalScrollBar().setValue(
text_edit.verticalScrollBar().maximum())
text_edit.horizontalScrollBar().setValue(
text_edit.horizontalScrollBar().maximum())
scroll_area = QScrollArea()
scroll_area.setBackgroundRole(QPalette.Light)
scroll_area.setWidgetResizable(True)
scroll_area.setMinimumHeight(600)
scroll_area.setMinimumWidth(700)
scroll_area.setWindowTitle(log_title)
scroll_area.verticalScrollBar().setValue(
scroll_area.verticalScrollBar().maximum())
scroll_area.horizontalScrollBar().setValue(
scroll_area.horizontalScrollBar().maximum())
scroll_area.verticalScrollBar().setSliderPosition(
scroll_area.verticalScrollBar().maximum())
scroll_area.horizontalScrollBar().setSliderPosition(
scroll_area.horizontalScrollBar().maximum())
scroll_area.setWidget(text_edit)
text_edit.scrollArea = scroll_area
layout = QVBoxLayout()
layout.addWidget(scroll_area)
layout.setContentsMargins(0, 0, 0, 0)
qdialog = QDialog(self)
qdialog.setWindowTitle(log_title)
qdialog.setLayout(layout)
qdialog.show()
#def save_to_hdf_started(self):
# QApplication.processEvents()
#def save_to_hdf_finished(self):
# QApplication.processEvents()
@Slot()
def write_to_destinations(self):
#disable signal
#self.gui_frame.autopost_save.blockSignals(True)
QApplication.processEvents()
if self.autopost_epics:
self.save_to_epics()
QApplication.processEvents()
if self.autopost_hdf:
self.save_to_hdf()
QApplication.processEvents()
if self.autopost_elog:
self.send_to_elog()
QApplication.processEvents()
#enable signal
#self.gui_frame.autopost_save.blockSignals(False)
def set_new_hdf_filename(self, time_in_seconds=None,
reanalysis_time_in_seconds=None):
self.hdf_filename = (self.hdf_dest + self.add_date_to_path(
time_in_seconds, reanalysis_time_in_seconds) + ".h5")
def add_date_to_path(self, time_in_seconds=None,
reanalysis_time_in_seconds=None):
if time_in_seconds is None:
time_in_seconds = time.time()
when = datetime.fromtimestamp(time_in_seconds)
_date_label = when.strftime("%Y-%m-%d_%H:%M:%S")
if reanalysis_time_in_seconds is not None:
now = datetime.fromtimestamp(reanalysis_time_in_seconds)
_date_now = now.strftime("%Y-%m-%d_%H:%M:%S")
_date_label += "_re" + _date_now
_year = now.strftime("%Y")
_month = now.strftime("%m")
_day = now.strftime("%d")
else:
_year = when.strftime("%Y")
_month = when.strftime("%m")
_day = when.strftime("%d")
if self.elog_add_date_to_dir:
return str(_year + "/" + _month + "/" + _day + "/" + self.appname +
"_" + _date_label)
else:
return str(self.appname + "_" + _date_label)
def verify_save_to_hdf(self):
""" To be called by user from save_to_hdf
A return of True tells the user to continue to next step
"""
if self.analysis_thread is not None:
if self.analysis_thread.isRunning():
_mess = ("Measurement in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "HDF", _mess,
QMessageBox.Ok)
QApplication.processEvents()
return False
if self.hdf_thread is not None:
if self.hdf_thread.isRunning():
_mess = ("HDF analysis in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "HDF", _mess,
QMessageBox.Ok)
QApplication.processEvents()
return False
if not self.daq_analysis_completed:
QMessageBox.information(self, "HDF", (
("No data to save to hdf; no measurement undertaken!")),
QMessageBox.Ok)
QApplication.processEvents()
return False
if self.hdf_save_completed:
_mess = "Data previously saved to hdf:\n{0}".format(
self.hdf_filename)
QMessageBox.information(self, "HDF", _mess,
QMessageBox.Ok)
QApplication.processEvents()
return False
return True
def add_to_hdf(self, dataH5=None, proc=True, raw=False):
""" Abstract method to be overwritten by user. Optional.
"""
'''
QM = QMessageBox()
QM.setText(
str(NotImplementedError("add_to_hdf method has not been " +
"implemented in this application. \n If " +
"not required, consider removing the hdf" +
"icon from the application/config file."))
)
QM.exec()
'''
return
@Slot()
def save_to_hdf(self):
""" Abstract method to be overwritten by user
"""
QM = QMessageBox()
QM.setText(
str(NotImplementedError("save_to_hdf method has not been " +
"implemented in this application. \n" +
"If not required, consider removing the " +
"icon from the application/config file."))
)
QM.exec()
def add_pvs_to_hdf(self, dataH5, top_group="experiment", pv_list=None,
from_hdf=False):
isOK = True
if not pv_list:
return
grp_data = dataH5.create_group(top_group)
if from_hdf and top_group in self.all_data.keys():
for key1 in self.all_data[top_group].keys():
dt_name_top = key1 + "/"
for key2, value in self.all_data[top_group][key1].items():
dt_name = dt_name_top + key2
try:
grp_data.create_dataset(dt_name, data=value[()])
except TypeError as err:
isOK = False
_mess = ("Error in saving to HDF ['{0}']['{1}']. " +
"{2}").format(top_group, dt_name, str(err))
print(_mess, flush=True)
self.trigger_log_message.emit(
MsgSeverity.WARN.name, _pymodule, _line(), _mess,
{})
return isOK
if from_hdf:
return isOK
self.cafe.attachContext(pv_list[0])
for pv in pv_list:
self.cafe.setGetActionWhenMonitorPolicy(
pv, self.cyca.GET_FROM_CACHE)
pv_dict = self.cafe.getDictionary(pv_list)[0]
for key, value in pv_dict.items():
k = key.split(":")
if len(k) <= 1:
k = key.split("-", 1)
dt_name = k[0] + "/" + k[1]
try:
grp_data.create_dataset(dt_name, data=value)
except TypeError as err:
isOK = False
_mess = ("Error in saving to HDF ['{0}']['{1}'] " +
"{2}").format(dt_name, top_group, str(err))
print(_mess, flush=True)
self.trigger_log_message.emit(
MsgSeverity.WARN.name, _pymodule, _line(), _mess, {})
return isOK
def add_general_to_hdf(self, dataH5):
isOK = True
user_dict = {}
user_dict['Comment'] = self.hdf_dialog.user_dict[
'Comment'] if self.hdf_dialog is not None else str(
"HDF file generated via Save All button")
user_dict['Author'] = self.author
user_dict['Application'] = self.pymodule
user_dict['Version'] = self.appversion
#user_dict['Comment'] = comment if comment is not None else str(
# "HDF file generated via Save All button")
user_dict['Filepath'] = self.source_file
#user_dict['Process'] = os.getpid()
#user_dict['UID'] = os.geteuid()
user_dict['User'] = getpass.getuser()
if self.all_data is not None:
if 'Time in seconds' in self.all_data['Ambient data']:
time_in_seconds = self.all_data['Ambient data']['Time in seconds']
now = datetime.fromtimestamp(time_in_seconds)
else:
now = datetime.now()
else:
now = datetime.now()
_date = now.strftime("%Y-%m-%d_%H:%M:%S")
user_dict['Created'] = _date
grp_data = dataH5.create_group("general")
for key in user_dict:
lowkey = key.lower()
try:
grp_data.create_dataset(lowkey.replace(' ', '_'),
data=user_dict[key])
except TypeError as err:
isOK = False
_mess = ("Error in saving to HDF ['general']['{0}']. " +
"{1}").format(key, str(err))
print(_mess, flush=True)
self.trigger_log_message.emit(
MsgSeverity.WARN.name, _pymodule, _line(), _mess, {})
return isOK
@Slot()
def save_to_hdf_dialog(self):
""" This uses the widget interface to allow the user to enter
additional meta-data
"""
if not self.verify_save_to_hdf():
return False
input_options = OrderedDict()
if self.all_data is not None:
ts_in_seconds = self.all_data['Ambient data']['Time in seconds']
now_in_seconds = None
if 'Reanalysis time in seconds' in self.all_data['Processed data']:
if self.all_data['Processed data']['Reanalysis time']:
now_in_seconds = self.all_data['Processed data'][
'Reanalysis time in seconds']
self.set_new_hdf_filename(ts_in_seconds, now_in_seconds)
input_options['Destination'] = self.hdf_filename
input_options['Time in seconds'] = self.all_data['Ambient data'][
'Time in seconds']
self.hdf_dialog = QSaveHDF(self, input_options=input_options,
from_dialog=True)
def verify_send_to_elog(self):
if self.analysis_thread is not None:
if self.analysis_thread.isRunning():
_mess = ("Measurement in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "ELOG", _mess,
QMessageBox.Ok)
return False
if not self.daq_analysis_completed:
_mess = ("Opening ELOG, but please note that no new measurement " +
"has been undertaken")
QMessageBox.information(self, "ELOG", _mess,
QMessageBox.Ok)
return True
if self.save_hdf_thread is not None:
if self.save_hdf_thread.isRunning():
return True
if not self.hdf_save_completed and not self.from_hdf:
_mess = ("Opening ELOG, but please note that data have not " +
"been saved to HDF. " +
"<br>Click on the HDF icon to do this if desired")
QMessageBox.information(self, "ELOG", _mess, QMessageBox.Ok)
return True
return True
@Slot()
def send_to_elog(self):
""" Response to elog_action; normally overwritten
"""
if not self.verify_send_to_elog():
return
'''
if self.analysis_thread is not None:
if self.analysis_thread.isRunning():
_mess = ("Measurement in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "ELOG", _mess,
QMessageBox.Ok)
return
'''
#_elog_sf = ElogSwissFEL()
_logbook = None
_category_idx = 0 #_elog_sf.category.MEASUREMENT=1
_domain_idx = 0 #_elog_sf.system.NONE
_section_idx = 0 #_elog_sf.system.BEAMDYNAMICS=1
_attach_files = []
_message = ""
QSendToELOG(self, logbook=_logbook, categoryIdx=_category_idx,
domainIdx=_domain_idx, sectionIdx=_section_idx,
title=self.title, message=_message,
attachFile=_attach_files)
QApplication.processEvents()
def initiate_cycling(self, pv_list=[]):
if not pv_list:
return False
pv_cycle_quads = [s for s in pv_list if s.endswith("CYCLE")]
print("Initiate Cycle_quads", pv_cycle_quads)
if pv_cycle_quads != pv_list:
for quad in pv_list:
if "CYCLE" not in quad:
_mess = "Wrong PV name for cycling! pv = {0}".format(quad)
self.show_log_message(MsgSeverity.ERROR, _pymodule, _line(),
_mess)
self.statusbar.showMessage(_mess)
if not pv_cycle_quads:
return False
if not self.input_parameters['simulation']:
pv_cycle_value = [2] * len(pv_cycle_quads)
status, status_list = self.cafe.setScalarList(pv_cycle_quads,
pv_cycle_value)
if status != self.cyca.ICAFE_NORMAL:
self.send_to_log_window(pv_list=pv_cycle_quads, status=status,
status_list=status_list,
operation='set', pymodule=_pymodule,
line=_line())
return True
def is_cycling(self, pv_cycle_quads=[]):
if pv_cycle_quads:
value_list, _status, _status_list = self.cafe.getScalarList(
pv_cycle_quads, dt='native')
if _status != self.cyca.ICAFE_NORMAL:
self.send_to_log_window(pv_list=pv_cycle_quads, status=_status,
status_list=_status_list,
operation='get', pymodule=_pymodule,
line=_line())
if "Cycling" in value_list:
# _mess = ("Presently cycling quads. " +
# "\nPlease try again in a few seconds")
# QMessageBox.information(self, "Verification", _mess,
# QMessageBox.Ok)
return True
return False
def wait_for_cycling(self, deflector, profile_monitor, pv_cycle_quads=[],
timeout=30):
if not pv_cycle_quads:
return
start = time.time()
while self.is_cycling(pv_cycle_quads):
time.sleep(0.005)
QApplication.processEvents()
if time.time()-start > timeout:
_mess = ("Timeout ({0}s) reached for Cycling. " +
"\nPlease check set points for " +
"\nDeflector: {1} " +
"\nProfile Monitor: {2}").format(
timeout, deflector, profile_monitor)
qm = QMessageBox()
qm.warning(self, "Set Optics", _mess, QMessageBox.Ok)
break
@Slot()
def set_optics(self):
""" Abstract method to be overwritten by user
"""
QM = QMessageBox()
QM.setText(
str(NotImplementedError("set_optics method has not been " +
"implemented in this application. \n"))
)
QM.exec()
@Slot()
def restore_optics(self):
""" Abstract method to be overwritten by user
"""
QM = QMessageBox()
QM.setText(
str(NotImplementedError("restore_optics method has not been " +
"implemented in this application. \n"))
)
QM.exec()
def verify_save_to_epics(self):
""" To be called by user from save_to_epics
"""
if self.all_data:
#Data from hdf analysis - do not save to epics
if 'Reanalysis time' in self.all_data['Processed data']:
if self.all_data['Processed data']['Reanalysis time']:
print("HDF RUN - data not written to epics")
return False
if self.all_data['Input data']['simulation']:
return False
else:
if self.input_parameters['simulation']:
return False
if self.analysis_thread is not None:
if self.analysis_thread.isRunning():
_mess = ("Measurement in progress. " +
"Please try again in a few seconds.")
QMessageBox.information(self, "EPICS", _mess,
QMessageBox.Ok)
QApplication.processEvents()
return False
if not self.all_data:
_mess = "No data to save to epics: data dictionary is empty!"
QMessageBox.information(self, "EPICS", _mess,
QMessageBox.Ok)
QApplication.processEvents()
return False
if not self.daq_analysis_completed:
_mess = "No data to save to epics; no measurement undertaken!!"
QMessageBox.information(self, "EPICS", _mess,
QMessageBox.Ok)
QApplication.processEvents()
return False
return True
def save_to_epics(self):
""" Abstract method to be overwritten by user
"""
QM = QMessageBox()
QM.setText(
str(NotImplementedError("save_to_epics method has not been " +
"implemented in this application. \n" +
"If not required, consider removing the " +
"icon from the application/config file."))
)
QM.exec()
def send_to_log_window(self, pv_list: list = [], status: int = 1,
status_list: list = [], operation: str = 'set/get',
pymodule=_pymodule, line: int = _line()):
if status != self.cyca.ICAFE_NORMAL:
ibad = 0
for _status, _pv in zip(status_list, pv_list):
if _status != self.cyca.ICAFE_NORMAL:
ibad += 1
_mess = "Error in '{1}' for pv {0}.".format(_pv, operation)
_options = {}
_options['statusCode'] = "{0} {1}".format(
str(_status), self.cafe.getStatusCodeAsString(_status))
_options['statusInfo'] = self.cafe.getStatusInfo(_status)
self.show_log_message(MsgSeverity.ERROR, pymodule, line,
_mess, _options)
_mess = ("{0} of the {1} PV operations resulted in an error. " +
"See Log window").format(ibad, len(status_list))
self.statusbar.showMessage(_mess)
def send_to_epics(self, pv_dict: dict = None, pv_names: list = None,
pv_values: list = None) -> (int, list):
if pv_dict is not None:
pv_values = []
pv_names = list(pv_dict.keys())
for val in pv_dict.values():
if isinstance(val, np.ndarray):
pv_values.append(val.tolist())
else:
pv_values.append(val)
print("SAVE TO EPICS", flush=True)
print(pv_names, flush=True)
print(pv_values, flush=True)
else:
if len(pv_names) != len(pv_values):
_mess = ("len(pv_values)={0} does not match " +
"len(pv_names)={1}").format(len(pv_values),
len(pv_names))
self.show_log_message(MsgSeverity.ERROR, _pymodule, _line(),
_mess)
self.statusbar.showMessage(_mess)
return
self.cafe.openPrepare()
self.cafe.open(pv_names)
self.cafe.openNowAndWait(0.4)
status = self.cyca.ICAFE_NORMAL
status_list = []
try:
status, status_list = self.cafe.setCompoundList(pv_names, pv_values)
except:
print("Exception raised in cafe.setCompoundList", flush=True)
status = self.cyca.ECAFE_BADTYPE
for pv, val in zip(pv_names, pv_values):
print("pv/val", pv, val, flush=True)
status_list.append(self.ECAFE_BADTYPE)
if status != self.cyca.ICAFE_NORMAL:
ibad = 0
for _status, _pv in zip(status_list, pv_names):
if _status != self.cyca.ICAFE_NORMAL:
ibad += 1
_mess = "Error in 'set' for pv {0}.".format(_pv)
_options = {}
_options['statusCode'] = "{0} {1}".format(
str(_status), self.cafe.getStatusCodeAsString(_status))
_options['statusInfo'] = self.cafe.getStatusInfo(_status)
self.show_log_message(MsgSeverity.ERROR, _pymodule,
_line(), _mess, _options)
_mess = ("{0} of the {1} PVs could not be set. " +
"See Log window").format(ibad, len(status_list))
self.statusbar.showMessage(_mess)
QMessageBox.warning(self, _pymodule, _mess, QMessageBox.Ok)
else:
_mess = "Data saved to epics successully"
self.show_log_message(MsgSeverity.INFO, _pymodule, _line(), _mess)
self.statusbar.showMessage(_mess)
return status, status_list
@Slot()
def send_to_printer(self):
""" Response to printer_action
"""
self.printer = QPrinter(QPrinter.HighResolution)
self.printer.setPaperSize(QPrinter.A4)
self.printer.setOrientation(QPrinter.Landscape)
form = QPrintDialog(self.printer, self)
if form.exec_():
painter = QPainter(self.printer)
w = self.centralWidget()
xscale = self.printer.pageRect().width() / (w.width())
yscale = self.printer.pageRect().height() / (w.height())
scale = min(xscale, yscale)
painter.translate(self.printer.paperRect().x() +
self.printer.pageRect().width() / 2,
self.printer.paperRect().y() +
self.printer.pageRect().height() / 2)
painter.scale(scale, scale)
painter.translate(-w.width()/2, -w.height()/2)
w.render(painter)
@Slot()
def show_about(self):
""" To overide by application
"""
QApplication.processEvents()
QMessageBox.about(
self, "About",
"""<b>{0}</b> v {1}
<p>Copyright &copy; Paul Scherrer Institut (PSI).
All rights reserved.</p>
<p>Author: J. Chrin </p>
<p>1st Responsible: J. Chrin, Tel. 2930,
+41 76 206 0988, jan.chrin@psi.ch </p>
<p>2nd Responsible: 2nd name if appropriate </p>
<p>A main-window style application with menus,
a status bar, a toolbar and log window </p>
<p>Python {2} - Qt {3} - PyQt {4} <br>
cafe {5} - epics {6} on {7}""".format(
_pymodule, _appversion, platform.python_version(),
QT_VERSION_STR, PYQT_VERSION_STR,
self.cafe.CAFE_version(), self.cafe.EPICS_version(),
platform.system()))
QApplication.processEvents()
@Slot()
def show_help(self):
""" Invoke help pages from qrc_resources
"""
index_html = self.appname + "/index.html"
help_base = ":/help/" + self.appname
help_page = HelpBrowser(help_base, index_html, self)
help_page.show()
@Slot()
def take_screenshot(self):
QScreenshot(self, window_title=self.appname,
screen_items=self.screenshot_items,
screen_titles=self.screenshot_titles)
def add_screenshot(self, title=None, item=None):
if title and item:
if title not in self.screenshot_titles and \
item not in self.screenshot_items:
self.screenshot_titles.append(title)
self.screenshot_items.append(item)
def clear_screenshot(self, title=None, item=None):
if title and item:
if title in self.screenshot_titles and \
item in self.screenshot_items:
self.screenshot_titles.remove(title)
self.screenshot_items.remove(item)
else:
self.screenshot_titles.clear()
self.screenshot_items.clear()
############################### Init
def init_statusbar_wgt(self):
""" Configure statusbar
"""
self.statusbar_label.setFrameStyle(QFrame.StyledPanel | QFrame.Sunken)
self.statusbar.setSizeGripEnabled(False)
self.statusbar.addPermanentWidget(self.statusbar_label)
self.statusbar.clearMessage()
self.statusbar.showMessage("Initialization complete")
def init_progressbar_wgt(self):
self.progressbar_simulation = "simulation"
self.progressbar_standard = "blue"
self.progressbar_abort = "abort"
self.progressbar_color = self.progressbar_standard
self.progressbar.setObjectName(self.progressbar_color)
self.progressbar.setRange(PROGRESS_THREAD_START,
PROGRESS_THREAD_END)
self.progressbar.setTextVisible(True)
self.progressbar.setAlignment(Qt.AlignCenter)
self.progressbar.setVisible(False)
def init_hdf_analysis_wgt(self):
self.h5_groupbox = HDF5GroupBox(self, title="HDF5 Analysis",
layout="Hor")
self.hdf_dock_widget.setObjectName("hdfDockWidget")
self.hdf_dock_widget.setVisible(False)
#hdfDockWidget.setFloating(True)
self.hdf_dock_widget.setContentsMargins(9, 9, 9, 9)
self.h5_groupbox.analyze_h5_widget.clicked.connect(
self.start_hdf_thread)
self.hdf_dock_widget.setWidget(self.h5_groupbox)
self.addDockWidget(Qt.TopDockWidgetArea, self.hdf_dock_widget)
def restore_application_settings(self):
""" Restore platform-independent application settings
"""
qsettings = QSettings()
if qsettings.value("RecentFiles"):
self.application_recent_files = str(
qsettings.value("RecentFiles")).split(',')
if qsettings.value("BaseWindow/Geometry"):
self.restoreGeometry(qsettings.value("BaseWindow/Geometry"))
if qsettings.value("BaseWindow/State"):
self.restoreState(qsettings.value("BaseWindow/State"))
self.application_geometry = self.geometry()
def save_application_settings(self):
""" Save platform-independent application settings
"""
settings = QSettings()
if self.filename:
settings.setValue("LastFile", str(self.filename))
if self.application_recent_files:
recent_files = ', '.join(self.application_recent_files)
settings.setValue("RecentFiles", (recent_files))
settings.setValue("BaseWindow/Geometry", (self.saveGeometry()))
settings.setValue("BaseWindow/State", (self.saveState()))
#To be overloaded by user
def load_hdf_file(self):
""" load_hdf5 file into analysis dictionary
"""
return {}
@Slot()
def start_hdf_thread(self):
""" Start hdf re-analysis thread
"""
if not self.analysis_procedure:
mess = "HDF analysis thread not configured for this application"
self.show_log_message(MsgSeverity.ERROR, _pymodule, _line(), mess)
self.statusbar.showMessage(mess)
return
if self.analysis_thread:
if self.analysis_thread.isRunning():
_mess = ("Measurement already in progress.")
QMessageBox.information(self, "Measurement in progress..",
_mess, QMessageBox.Ok)
return
if self.hdf_thread:
if self.hdf_thread.isRunning():
_mess = ("HDF analysis already in progress.")
QMessageBox.information(self, "HDF analysis in progress..",
_mess, QMessageBox.Ok)
return
#self.hdf_thread_started()
self.statusbar.showMessage("Loading {0}".format(
self.hdf_filename_loaded))
self.trigger_progressbar_str.emit(
5, "Loading HDF file for analysis")
#Fill all data from HDF file
###hdf_all_data = self.load_hdf_file()
###if not hdf_all_data:
### self.hdf_thread_finished()
### return
self.hdf_thread = self.HDFThread(
self, self.analysis_procedure, all_data=None) #=hdf_all_data
self.hdf_thread.trigger_thread_event.connect(
self.receive_analysis_results)
#procedure moved above
self.hdf_thread.started.connect(self.hdf_thread_started)
self.hdf_thread.finished.connect(self.hdf_thread_finished)
self.hdf_thread.start()
QApplication.processEvents()
#To be overloaded by user
def verify_analysis_preconditions(self):
return True
@Slot()
def start_analysis_thread(self):
'''Slot to self.start_wgt button trigger in guiframe.py
'''
if not self.analysis_procedure:
mess = "Analysis thread not configured for this application"
self.show_log_message(MsgSeverity.ERROR, _pymodule, _line(), mess)
self.statusbar.showMessage(mess)
return
if self.analysis_thread:
if self.analysis_thread.isRunning():
_mess = ("Measurement already in progress.")
QMessageBox.information(self, "Measurement in progress..",
_mess, QMessageBox.Ok)
return
if self.hdf_thread:
if self.hdf_thread.isRunning():
_mess = ("HDF analysis already in progress.")
QMessageBox.information(self, "HDF analysis in progress..",
_mess, QMessageBox.Ok)
return
if not self.verify_analysis_preconditions():
return
self.analysis_thread = self.AnalysisThread(
self, self.analysis_procedure, self.input_parameters)
self.analysis_thread.trigger_thread_event.connect(
self.receive_analysis_results)
self.analysis_thread.started.connect(self.analysis_thread_started)
self.analysis_thread.finished.connect(self.analysis_thread_finished)
self.analysis_thread.start()
QApplication.processEvents()
@Slot()
def analysis_thread_started(self):
""" Change state of widgets when measuring
"""
self.gui_frame.in_measurement_procedure()
QApplication.processEvents()
#print("Thread Started")
@Slot()
def analysis_thread_finished(self):
""" Reset widgets to intial pre-measurement state
"""
self.gui_frame.reset_procedure()
QApplication.processEvents()
#print("Thread Finished")
@Slot()
def hdf_thread_started(self):
""" Change state of widgets when measuring
"""
self.gui_frame.in_hdf_measurement_procedure()
QApplication.processEvents()
@Slot()
def hdf_thread_finished(self):
""" Reset widgets to intial pre-measurement state
"""
self.gui_frame.hdf_reset_procedure()
QApplication.processEvents()
#print("Thread Finished")
@Slot(dict)
def receive_analysis_results(self, all_dict):
self.all_data = all_dict
print("self.all_data", self.all_data.keys(), flush=True)
self.gui_frame.canvas_update(all_dict['Figure data'])
if self.gui_frame.results_output_wgt_dict:
#all_dict['Processed data']['Results']={}
#all_dict['Processed data']['Results']['mean'] = 123.23
#all_dict['Processed data']['Results']['SD'] = 0.23
try:
results_data = all_dict['Processed data']['Results']
self.gui_frame.send_to_results_output_wgt(results_data)
except:
pass
#print("IDX+++", self.gui_frame.central_tab_widget.indexOf('Emittance'), flush=True)
#print("IDX+++", self.gui_frame.level2_tab_wgt[0].indexOf('Plots'))
#self.gui_frame.central_tab_widget.setCurrentIndex(1)
self.gui_frame.results_tab_wgt.setCurrentIndex(0)
if "GUITree" in self.settings.data:
#for j in range(len(self.gui_frame.level1_tab_wgt)):
j = self.gui_frame.central_tab_widget.currentIndex()
for i in range(self.gui_frame.level1_tab_wgt[j].count()):
print(j, i, self.gui_frame.level1_tab_wgt[j].tabText(i), flush=True)
if self.gui_frame.level1_tab_wgt[j].tabText(i) == "Plots":
self.gui_frame.level1_tab_wgt[j].setCurrentIndex(i)
else:
pass
else:
for i in range(self.gui_frame.central_tab_widget.count()):
print(i, self.gui_frame.central_tab_widget.tabText(i), flush=True)
if self.gui_frame.central_tab_widget.tabText(i) == "Plots":
self.gui_frame.central_tab_widget.setCurrentIndex(i)
else:
pass
for i in range(self.gui_frame.measurement_tab_wgt.count()):
print(i, self.gui_frame.measurement_tab_wgt.tabText(i), flush=True)
if self.gui_frame.measurement_tab_wgt.tabText(i) == "Plots":
self.gui_frame.measurement_tab_wgt.setCurrentIndex(i)
else:
pass
for i in range(self.gui_frame.results_tab_wgt.count()):
print(i, self.gui_frame.results_tab_wgt.tabText(i), flush=True)
if self.gui_frame.results_tab_wgt.tabText(i) == "Plots":
self.gui_frame.results_tab_wgt.setCurrentIndex(i)
else:
pass
print("receive_analysis_results=========================>", flush=True)
@Slot()
def receive_abort_analysis(self):
"""Instantiate action on abort
"""
if self.analysis_procedure.abort:
return
self.gui_frame.in_abort_procedure()
# Trigger abort signal to the analysis thread
self.analysis_procedure.trigger_abort.emit()
#self.trigger_progressbar.emit(PROGRESS_THREAD_ABORTING)
QApplication.processEvents()
@Slot(str, str, int, str, dict)
def receive_log_message(self, severity, module, line, message, options={}):
'''Receive message from thread for routing to log window'''
self.show_log_message(severity, module, line, message, options)
self.statusbar.showMessage(message)
@Slot(int)
@Slot(int, str)
def receive_progressbar(self, value=0,
message="Measurement in progress..."):
'''Receives update of message'''
self.progressbar.setVisible(True)
try:
if self.input_parameters["simulation"] :
self.progressbar_color = self.progressbar_simulation
except KeyError:
pass
if value == PROGRESS_THREAD_INIT:
self.progressbar.setVisible(False)
self.progressbar.setFormat("")
self.progressbar.reset()
self.progressbar.setObjectName(self.progressbar_color)
self.statusbar.clearMessage()
elif value == PROGRESS_THREAD_START:
self.statusbar.clearMessage()
self.progressbar.setFormat("Measurement started")
self.progressbar.setValue(value)
self.progressbar.setObjectName(self.progressbar_color)
self.daq_analysis_completed = False
elif value == PROGRESS_THREAD_ABORTING:
self.progressbar.setFormat(
"Aborting procedure at the next available break point")
self.progressbar.setObjectName(self.progressbar_abort)
prog_val = re.findall(r'\d+', message)
if prog_val:
self.progressbar.setValue(int(prog_val[0]))
else:
self.progressbar.setValue(int(10))
mess = "Aborting measurement procedure"
self.show_log_message(
MsgSeverity.WARN.name, _pymodule, _line(), mess)
#self.statusbar.showMessage(mess)
elif value == PROGRESS_THREAD_ABORTED:
self.progressbar.setFormat("Procedure aborted")
self.progressbar.setObjectName(self.progressbar_abort)
mess = "Measurement procedure aborted"
self.show_log_message(
MsgSeverity.WARN.name, _pymodule, _line(), mess)
self.statusbar.showMessage(mess)
self.daq_analysis_completed = False
QTimer.singleShot(2000, lambda: self.trigger_progressbar.emit(
PROGRESS_THREAD_INIT))
elif value == PROGRESS_THREAD_ERROR:
mess = "Error in Thread. No data returned! See Log window"
self.progressbar.setFormat(mess)
self.progressbar.setObjectName(self.progressbar_abort)
self.statusbar.showMessage(mess)
QTimer.singleShot(10000, lambda: self.trigger_progressbar.emit(
PROGRESS_THREAD_INIT))
elif value == PROGRESS_THREAD_END:
self.progressbar.setFormat("Measurement completed")
self.progressbar.setValue(value)
self.progressbar.setObjectName(self.progressbar_color)
self.daq_analysis_completed = True
if self.autopost_hdf:
self.hdf_save_completed = False
QTimer.singleShot(2000, lambda: self.progressbar.setVisible(False))
else:
self.progressbar.setFormat(message)
self.progressbar.setValue(value)
self.progressbar.setObjectName(self.progressbar_color)
self.progressbar.style().polish(self.progressbar)
QApplication.processEvents()
time.sleep(0.0001)
def initialize_application(self, appname=_appname, delay=None,
facility=Facility.SwissFEL):
"""
Initialize application configuration for use by QSettings.
Loads application stylesheet from qrc_resources or input file
These are saved into $HOME/.config
e.g., $HOME/.config/<facility>/appname.conf
Provide QSplashScreen if delay is set.
"""
self.splash_appname = appname
if facility == Facility.SwissFEL:
from pyqtacc.qrc_resources.facility.sf.pyrcc5 import qrc_resources
print("FACILITY SwissFEL")
elif facility == Facility.SLS:
from pyqtacc.qrc_resources.facility.sls.pyrcc5 import qrc_resources
print("FACILITY SLS")
elif facility == Facility.HIPA:
from pyqtacc.qrc_resources.facility.hipa.pyrcc5 import qrc_resources
print("FACILITY HIPA")
elif facility == Facility.PROSCAN:
from pyqtacc.qrc_resources.facility.proscan.pyrcc5 import qrc_resources
print("FACILITY PROSCAN")
elif facility == Facility.ESS:
from pyqtacc.qrc_resources.facility.ess.pyrcc5 import qrc_resources
print("FACILITY ESS")
else:
print("Unknown Facility; assuming SLS")
from pyqtacc.qrc_resources.facility.sls.pyrcc5 import qrc_resources
self.setStyle("motif")
self.setOrganizationName(facility.name)
self.setOrganizationDomain("psi.ch")
self.setApplicationName(appname)
self.setWindowIcon(QIcon(":/icon.png"))
parser = argparse.ArgumentParser(description="stylesheet")
parser.add_argument("-q", "--qss", action="store", dest="qss")
parser.add_argument("-s", "--style")
parser.add_argument("-f", "--facility")
parser.add_argument("-u", "--user")
#result, unknown = parser.parse_known_args(["--qss", ":/sfop.qss"])
#result, unknown = parser.parse_known_args(["--qss", ":/sfop.qss",
#"-c", "--facility", "--user"])
#print(result)
#print(unknown)
result = parser.parse_args()
if result.qss:
print("qss file {0} supplied by user".format(result.qss))
qss_file = result.qss if result.qss else ":/acc.qss"
qss = QFile(qss_file)
if qss.open(QIODevice.ReadOnly):
fByteArray = qss.readAll()
self.setStyleSheet(str(fByteArray.data(), encoding='utf-8'))
qss.close()
else:
print("COULD NOT OPEN {0} stylesheet".format(qss_file))
exit(1)
if delay is not None:
delay = max(delay, 5)
self.splash_screen = QSplashScreen(QPixmap(":/loading.gif"))
self.splash_screen.setAttribute(Qt.WA_TranslucentBackground, False)
self.splash_screen.setWindowFlags(Qt.WindowStaysOnTopHint)
self.splash_screen.showMessage(
"""
<br><p style='color:black; font-weight:bold;
font-size:28px; margin-right:10px;'>
LOADING APPLICATION:
<font color=royalblue > {0} </font> </p>
<p style='color:black; font-weight:bold;
font-size:20px; margin-right:22px;'>
This message will self-destruct in {1} seconds<br></p>
""".format(appname, round(delay)),
Qt.AlignmentFlag(Qt.AlignCenter|Qt.AlignTop))
width = 860 + (len(appname)-10)*15
height = 220
self.splash_screen.resize(width, height)
#Maybe useful at some point
#pSplashNotice = QCheckBox(self.splash_screen);
#pSplashNotice.setChecked(Qt.Checked)
self.splash_progressbar = QProgressBar(self.splash_screen)
self.splash_timer = QTimer()
self.splash_screen.show()
# Custom progress bar stylesheet
progressbar_stylesheet = """
QProgressBar:horizontal {
border: 1px solid royalblue;
background: white;
padding: 1px;
}
QProgressBar::chunk:horizontal {
background-color: qlineargradient(spread: pad, x1: 1, y1: 0.5,
x2: 0, y2: 0.5, stop: 0 royalblue, stop: 1 white);
}
"""
self.splash_progressbar.setStyleSheet(progressbar_stylesheet)
self.splash_progressbar.setGeometry(
10, self.splash_screen.height() - 40,
self.splash_screen.width() - 20, 30)
start1 = time.time()
def show_time():
now = time.time()
val = (now-start1)*100/delay
val = 5 * round(val/5)
int_seconds_remaining = int(delay - (now-start1))
seconds_remaining = '{:2d}'.format(int_seconds_remaining)
self.splash_progressbar.setValue(val)
self.processEvents()
self.flush()
sec_str = "s" if abs(int_seconds_remaining) != 1 else ""
mess = """
<br><p style='color:black; font-weight:bold;
font-size:28px; margin-right:10px;'>
LOADING APPLICATION:
<font color=royalblue > {0} </font> </p>
<p style='color:black; font-weight:bold;
font-size:20px; margin-left:120px; margin-right:2px;'>
{1:>2} second{2} remaining<br></p>
""".format(self.splash_appname, seconds_remaining, sec_str)
self.splash_screen.showMessage(
mess, Qt.AlignmentFlag(Qt.AlignCenter|Qt.AlignTop))
self.processEvents()
#print(val, seconds_remaining)
self.splash_timer.timeout.connect(show_time)
self.splash_timer.start(200) #ms with 10 samples per sec.
return self.splash_screen
def initialize_finished(self, myapp):
self.splash_timer.stop()
self.splash_progressbar.setValue(100)
time.sleep(0.4)
#app.processEvents()
self.splash_screen.showMessage(
"""
<br><p style='color:black; font-weight:bold;
font-size:28px; margin-right:10px;'>
LOADING APPLICATION:
<font color=royalblue > {0} </font> </p>
<p style='color:black; font-weight:bold;
font-size:20px; margin-left:120px; margin-right:2px;'>
Ready.... &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
<br></p>
""".format(self.splash_appname),
Qt.AlignmentFlag(Qt.AlignCenter | Qt.AlignTop))
self.splash_screen.finish(myapp)
def check_status_list(self, pymodule: str = _pymodule,
operation: str = "channel access",
pv_list: list = None, status_list: list = None,
line: int = _line()):
if pv_list is None:
return
check_stat = False
if status_list is None:
check_stat = True
elif None in status_list:
check_stat = True
if check_stat:
for i, (pv, stat) in enumerate(zip(pv_list, status_list)):
if stat is None:
status_list[i] = self.cafe.getStatus(pv)
brk = ("------------------------------------------------------" +
"------------------------------------------------------")
self.trigger_log_message.emit(
MsgSeverity.INFO.name, pymodule, line, brk, {})
options = {}
for i, (pv, stat) in enumerate(zip(pv_list, status_list)):
if stat != self.cyca.ICAFE_NORMAL:
mess = "Error in '{0}' for element [{1}], {2}.".format(
operation, i, pv)
options['statusCode'] = (
str(stat) + " " +
self.cafe.getStatusCodeAsString(stat))
options['statusInfo'] = self.cafe.getStatusInfo(stat)
self.trigger_log_message.emit(
MsgSeverity.WARN.name, pymodule, line, mess, options)
self.trigger_log_message.emit(
MsgSeverity.INFO.name, pymodule, line, brk, {})
mess = ("The following devices reported an error " +
"in channel access operation:")
self.trigger_log_message.emit(
MsgSeverity.INFO.name, pymodule, line, mess, {})
return status_list
def check_status(self, pymodule: str = _pymodule,
operation: str = "channel access",
pv: str = None, stat: int = None,
line: int =_line()):
if not pv:
return
if stat == None:
stat = self.cafe.getStatus(pv)
if stat != self.cyca.ICAFE_NORMAL:
mess = "Error in '{0}' for {1}.".format(operation, pv)
options = {}
options['statusCode'] = (
str(stat) + " " +
self.cafe.getStatusCodeAsString(stat))
options['statusInfo'] = self.cafe.getStatusInfo(stat)
self.trigger_log_message.emit(
MsgSeverity.WARN.name, pymodule, line, mess, options)
return stat