Files
tina/tina.py
2025-12-10 12:40:25 +01:00

568 lines
21 KiB
Python

'''Tina.py module for measuring the number of turns
'''
import os
import platform
import sys
import time
from qtpy.QtCore import __version__ as QT_VERSION_STR
from qtpy.QtCore import PYQT_VERSION_STR, Slot
from qtpy.QtWidgets import QApplication, QMessageBox
from apps4ops.bdbase.base import BaseWindow
from apps4ops.bdbase import h5_storage, utils
from apps4ops.bdbase.enumkind import Facility, MsgSeverity, UserMode
from apps4ops.bdbase.helpbrowser import HelpBrowser
from apps4ops.hipa.sendeloghipa import QSendToELOG
from apps4ops.hipa.enumkind import ElogHIPA
from src.gui import AppGui
from pyrcc5 import tina_resources
_pymodule = os.path.basename(__file__)
_appname, _appext = _pymodule.split('.')
_abspath = os.path.dirname(os.path.abspath(__file__))
_appversion = '1.4.0'
_title = 'No of Turns Measurement'
_appname = 'Tina'
class StartMain(BaseWindow):
'''Application to measure the no of turns in Injector 2 and the
Ring Cyclotron.
A python implementation of the LabView application developed by
Pierre-Andre Duperrex
Attributes:
appname: tina.py
source_file: full pathname for inclusion into HDF.
elog_enum: enumerated datatypes for elog attributes.
message_elog: string to hold the elogbook message.
self.default_idx: int to indicate default accelerator.
self.accelerator: str to indicate default accelerator.
self.message: string to hold message to gui log pane.
'''
ring_cyclotron = 'Cyclotron'
injector_2 = 'Injector'
accelerator_list = [injector_2, ring_cyclotron]
def __init__(self, parent=None):
'''Initialize class attrinbutes and instantiate GUI.'''
super().__init__(
parent=parent, pymodule=_pymodule, appversion=_appversion,
title=_title, user_mode=UserMode.OPERATION, facility=Facility.HIPA,
has_optics=False, has_procedure='VerticalExpert')
self.appname = _appname
self.source_file = _abspath
self.elog_enum = ElogHIPA()
self.message_elog = None
self.default_idx = self.settings.data['Parameters']['accelerator'][
'data']['value']
self.accelerator = self.ring_cyclotron if self.default_idx else \
self.injector_2
self.accelerator_peak_search = ' ' + self.accelerator + ' '
# self.from_hdf = False in base class
self.message = ''
self.init_pv_dict = self.settings.data['Init'][self.accelerator]
self.init_pv_list = list(self.init_pv_dict.keys())
self.init_pv_values = list(self.init_pv_dict.values())
if self.is_front_end_initialized(init=False):
mess = 'FEE Initialized'
self.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, utils.line_no(), mess, {})
AppGui(self)
def prepare_results_message(self):
'''Prepare results messages
'''
try:
self.no_turns = self.all_data['Processed data']['nturns']
lag_full = self.all_data['Processed data']['lag']
delay = self.all_data['Processed data']['delay']
self.correlation_peak_diff = self.all_data['Processed data'][
'correlation_peak_diff']
self.correlation_min_peak_diff = self.all_data['Processed data'][
'correlation_min_peak_diff']
except KeyError:
self.message = ''
self.message_elog = ''
return
try:
self.accelerator = self.all_data['Input data']['accelerator']
except KeyError as ex:
logger_mess = f'KeyError {str(ex)}'
self.logger.debug(logger_mess)
try:
self.accelerator = self.all_data['Input data']['qtabdata']
except KeyError as ex:
logger_mess = f'KeyError {str(ex)}'
self.logger.debug(logger_mess)
mess = 'Reanalysis from HDF5. ' if self.from_hdf else ''
self.message_elog = (
mess +
'''No. turns measured in the {0} = {1:0.0f} ({2:.2f}) <br>
lag = {3}, delay = {4:.3f} \u00B5s<br>'''.format(
self.accelerator, (self.no_turns), self.no_turns,
lag_full, delay * 10**6))
self.message = (
mess +
'''
No. turns measured in the {0} = {1:0.0f} ({2:.2f})
lag = {3}, delay = {4:.3f} \u00B5s'''.format(
self.accelerator, self.no_turns, self.no_turns,
lag_full, delay * 10**6))
def prepare_elog_message(self):
'''Define elog parameters and define message
'''
self.projekt_idx = self.elog_enum.projekt.NONE
self.system_idx = self.elog_enum.system.BEAMDYNAMICS
self.eintrag_idx = self.elog_enum.eintrag.INFO
self.ort_idx = self.elog_enum.ort.RING_CYCLOTRON if \
'Cyclotron' in self.accelerator else self.elog_enum.ort.INJECTOR2
self.status_idx = self.elog_enum.status.NONE
self.effekt_idx = self.elog_enum.effekt.NO
self.attach_files = []
simulation = self.input_parameters['simulation']
if self.all_data:
if self.all_data['Input data'] is not None:
try:
simulation = self.all_data['Input data']['simulation']
except KeyError:
simulation = self.input_parameters['simulation']
pass
self.logbook = 'Sandkasten' if simulation else 'HIPA'
self.title = _title
def is_front_end_initialized(self, init: bool = True):
'''Initialized Front End Electonics if required
'''
value_list, status, status_list = self.cafe.getScalarList(
self.init_pv_list, dt='int')
if status != self.cyca.ICAFE_NORMAL:
self.check_status_list(
_pymodule, 'getScalarList', self.init_pv_list, status_list,
utils.line_no())
if value_list != self.init_pv_values and not init:
return False
def is_simulation():
if self.all_data:
if self.all_data['Input data'] is not None:
try:
simulation = self.all_data['Input data']['simulation']
except KeyError:
simulation = self.input_parameters['simulation']
pass
else:
simulation = self.input_parameters['simulation']
return simulation
if value_list != self.init_pv_values:
for val, val_init, pv in zip(value_list, self.init_pv_values,
self.init_pv_list):
if val != val_init:
if not is_simulation():
status = self.cafe.set(pv, val_init)
if status != self.cyca.ICAFE_NORMAL:
self.check_status(_pymodule, 'set', pv, status,
utils.line_no())
time.sleep(0.02)
value_list, status, status_list = self.cafe.getScalarList(
self.init_pv_list, dt='int')
if status != self.cyca.ICAFE_NORMAL:
self.check_status_list(
_pymodule, 'getScalarList', self.init_pv_list, status_list,
utils.line_no())
if value_list != self.init_pv_values:
return False
return True
def verify_analysis_preconditions(self):
'''Verify machine is setup for measurement procedure-
Check machine current is above threshold (0.001 mA is default)
Check that that the front-end electronics are initialized
'''
accelerator = self.input_parameters['accelerator']
'''
if self.injector_2 in accelerator:
mess = ('Measurement procedure for Injector 2 \n' +
'has not yet been implemented.')
QMessageBox.information(self, 'Injector 2', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
'''
if self.input_parameters['simulation']:
return True
min_current = float(self.input_parameters[self.accelerator_peak_search][
'minimumCurrent'])
injector_current = self.cafe.getCache('MWC2:IST:2')
if not injector_current:
stat = self.cafe.getStatus('MWC2:IST:2')
self.check_status(_pymodule, 'getCache', injector_current, stat,
utils.line_no())
mess = ('Unable to read Injector 2 current\n' +
'Please try again.')
QMessageBox.information(self, 'Cyclotron', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
elif injector_current <= min_current:
mess = (f'Injector 2 current is below threshold {min_current} mA.' +
'\nMeasurement cannot be untertaken at the present time.')
QMessageBox.information(self, 'Cyclotron', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
return self.is_front_end_initialized(init=True)
@Slot()
def analysis_thread_finished(self):
'''If analysis completed successfuly, prepare results message.
'''
BaseWindow.analysis_thread_finished(self)
def delete_previous_figures():
print('Thread finished with no data')
ncanvas = len(self.settings.data['GUI']['subResultsTabTitle'])
dict_fig = {}
dict_fig['Figure data'] = {}
print('canvas', ncanvas, flush=True)
for i in range(0, ncanvas):
canvas = f'Canvas {i+1}'
dict_fig['Figure data'][canvas] = None
# Delete old figures
try:
self.gui_frame.canvas_update(dict_fig['Figure data'])
except AttributeError as ex:
print('AttributeError', ex, flush=True)
pass
print('Thread finished with no data', flush=True)
if self.all_data is not None:
try:
try:
if self.all_data['Figure data'] is not None:
self.gui_frame.central_tab_widget.setCurrentIndex(1)
else:
delete_previous_figures()
except AttributeError:
print('No analysis performed')
return
except KeyError:
print('No analysis performed')
return
else:
delete_previous_figures()
return
self.prepare_results_message()
self.show_log_message(MsgSeverity.INFO, _pymodule, utils.line_no(),
self.message)
@Slot()
def hdf_thread_finished(self):
'''If hdf analysis completed successfuly, prepare results
message.
'''
BaseWindow.hdf_thread_finished(self)
def delete_previous_figures():
print('Thread finished with no data')
ncanvas = len(self.settings.data['GUI']['subResultsTabTitle'])
dict_fig = {}
dict_fig['Figure data'] = {}
print('canvas', ncanvas, flush=True)
for i in range(0, ncanvas):
canvas = f'Canvas {i+1}'
dict_fig['Figure data'][canvas] = None
# Delete old figures
try:
self.gui_frame.canvas_update(dict_fig['Figure data'])
except AttributeError as ex:
print('AttributeError', ex, flush=True)
pass
print('Thread finished with no data', flush=True)
if self.all_data is not None:
try:
try:
if self.all_data['Figure data'] is not None:
self.gui_frame.central_tab_widget.setCurrentIndex(1)
else:
delete_previous_figures()
except AttributeError:
print('No analysis performed')
return
except KeyError:
print('No analysis performed')
return
else:
delete_previous_figures()
return
print("hdf_finished", flush=True)
self.prepare_results_message()
self.show_log_message(MsgSeverity.INFO, _pymodule, utils.line_no(),
self.message)
@Slot()
def save_to_hdf_dialog(self):
'''Save to hdf via the HDF dialogue window in menu bar.
'''
if self.from_hdf:
mess = ('This is a repeat analysis from HDF. \n' +
'Saving duplicate data to HDF is declined.')
QMessageBox.information(self, 'HDF', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
BaseWindow.save_to_hdf_dialog(self)
@Slot()
def save_to_hdf(self, from_dialog=False):
'''Save to hdf via pilot mode.
'''
if not self.verify_save_to_hdf():
return False
if self.from_hdf:
mess = ('This is a repeat analysis from HDF. \n' +
'Saving duplicate data to HDF is declined.')
QMessageBox.information(self, 'HDF', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
if self.all_data is not None:
self.save_hdf_thread = self.HDFSave(self, from_dialog)
# self.save_hdf_thread.started.connect(self.save_to_hdf_started)
# self.save_hdf_thread.finished.connect(self.save_to_hdf_finished)
self.save_hdf_thread.start()
self.hdf_save_completed = True
time.sleep(0.05) # Wait a tick
return True
def add_to_hdf(self, data_hdf, proc=True, raw=False):
'''User supplied hdf data
'''
if self.all_data is not None:
all_data = {}
all_data['Raw data'] = {}
all_data['Raw data']['Input_data'] = self.all_data[
'Input data']
all_data['Raw data']['Ambient_data'] = self.all_data[
'Ambient data']
all_data['Raw data']['Processed_data'] = self.all_data[
'Processed data']
all_data['Raw data']['Raw_data'] = self.all_data[
'Raw data']
h5_storage.saveH5Recursive(
self.hdf_filename, all_data['Raw data'], data_hdf)
@Slot()
def send_to_elog(self):
'''Override abstract method
'''
@Slot()
def save_fig_thread_finished():
'''Can take a few seconds to send to elog,
hence choose do this in a thread.
'''
time.sleep(0.2)
if self.all_data:
QSendToELOG(self, logbook=self.logbook,
projektIdx=self.projekt_idx,
eintragIdx=self.eintrag_idx,
systemIdx=self.system_idx,
statusIdx=self.status_idx,
ortIdx=self.ort_idx,
effektIdx=self.effekt_idx,
title=self.title,
message=self.message_elog,
attachFile=self.attach_files)
time.sleep(0.5)
self.prepare_elog_message()
if not self.all_data:
QSendToELOG(self, logbook=self.logbook,
projektIdx=self.projekt_idx,
eintragIdx=self.eintrag_idx,
systemIdx=self.system_idx,
statusIdx=self.status_idx,
ortIdx=self.ort_idx,
effektIdx=self.effekt_idx,
title=self.title,
message=self.message_elog,
attachFile=self.attach_files)
return
folder_name = self.elog_dest
if not os.path.exists(folder_name):
os.makedirs(folder_name)
time_in_seconds = self.all_data['Ambient data']['Time in seconds']
try:
reanalysis_time = self.all_data['Processed data'][
'Reanalysis time in seconds']
except KeyError:
reanalysis_time = None
self.folder_name = folder_name
save_fig_thread = self.SaveFigureThread(
self, self.folder_name, time_in_seconds, reanalysis_time)
save_fig_thread.finished.connect(save_fig_thread_finished)
save_fig_thread.start()
time.sleep(0.05)
def save_to_epics(self):
''' Write the number of turns calculated to an EPICS PV
'''
if not BaseWindow.verify_save_to_epics(self):
return False
if self.from_hdf:
mess = ('This is a repeat analysis from HDF. \n' +
'Analysis results are not saved to EPICS')
QMessageBox.information(self, 'EPICS', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
if self.correlation_peak_diff < self.correlation_min_peak_diff:
mess = ('Measurement is suspect as difference in top two peak \n' +
'values of ' +
f'{self.peak_diff:0.3f} is less than accepted minimum ' +
f'of {self.correlation_min_peak_diff:0.3f}. \n' +
'Analysis result is not saved to EPICS!')
QMessageBox.information(self, 'EPICS', mess, QMessageBox.Ok)
QApplication.processEvents()
return False
simulation = self.input_parameters['simulation']
if not simulation:
dict_bunch = {}
nturns = round(self.no_turns)
pv = self.settings.data['PV']['Cyclotron']['nturns']
dict_bunch[pv] = nturns
status, _ = self.send_to_epics(dict_bunch)
if status == self.cyca.ICAFE_NORMAL:
mess = f'Saved data to EPICS; No turns = {nturns}'
sev = MsgSeverity.INFO
else:
mess = f'Value (nturns={nturns}) not saved to epics'
sev = MsgSeverity.ERROR
self.show_log_message(sev.name, _pymodule, utils.line_no(), mess)
return True
@Slot()
def closeEvent(self, event):
'''Close application only if conditions allow.
'''
if not self.verify_close_event():
event.ignore()
return
BaseWindow.closeEvent(self, event)
@Slot()
def show_about(self):
'''Behind the scences information
'''
QApplication.processEvents()
QMessageBox.about(
self, 'About',
"""<b>{0}</b> v {1}
<p>Copyright &copy; Paul Scherrer Institut (PSI).
All rights reserved.</p>
<p>P.-A. Duperrex, J. Chrin, A. Facchetti, D. Felici,
W. Koprek, J. Sun </p>
<p>A python implementation of the LabVIEW measurement originally
developed by P.-A. Duperrex <br>
Ref: P.-A. Duperrex and A. Facchetti <br>
'Number of Turn Measurements on the HIPA Cyclotrons at PSI' <br>
doi:10.18429/JACoW-IPAC2018-WEPAL067 </p>
<p>Contact: J. Sun, WBGA/C32, Tel. x4127,
jilei.sun@psi.ch </p>
<p>Responsible (low-level): W. Koprek, WBBA/315, Tel. x3765,
waldemar.koprek@psi.ch </p>
<p>Responsible (high-level): J. Chrin, WBBA/318, Tel. x2930,
jan.chrin@psi.ch </p>
<p>A main-window style application for the measurement of
the number of turns in the HIPA cyclotron and injector </p>
<p>Python {2} - Qt {3} - PyQt {4} <br>
cafe {5} - epics {6} on {7}""".format(
_pymodule, _appversion, platform.python_version(),
QT_VERSION_STR, PYQT_VERSION_STR,
self.cafe.CAFE_version(), self.cafe.EPICS_version(),
platform.system()))
QApplication.processEvents()
@Slot()
def show_help(self):
'''Invoke help pages from tina_resources
'''
index_html = 'index.html'
help_base = ':'
help_page = HelpBrowser(help_base, index_html, self, xlength=800,
ylength=1290)
help_page.show()
#########################################################################
if __name__ == '__main__':
'''Initialize window parameters and start application via a splash
window.
'''
app = QApplication(sys.argv)
splash = BaseWindow.initialize_application(
app, appname=_appname, delay=5, facility=Facility.HIPA)
myapp = StartMain()
myapp.show()
if splash is not None:
splash.finish(myapp)
app.exec_()