From d9f098582f80fe7bd91f2e0dbd65e821af080293 Mon Sep 17 00:00:00 2001 From: chrin Date: Thu, 15 Sep 2022 08:15:54 +0000 Subject: [PATCH] Delete pvwidgets.py- --- pvwidgets.py- | 3112 ------------------------------------------------- 1 file changed, 3112 deletions(-) delete mode 100644 pvwidgets.py- diff --git a/pvwidgets.py- b/pvwidgets.py- deleted file mode 100644 index 3e05f6d..0000000 --- a/pvwidgets.py- +++ /dev/null @@ -1,3112 +0,0 @@ -''' Module with channel access enabled QtWidgets.''' -__author__ = 'Jan T. M. Chrin' - -import re -import time - -import collections -import numpy as np -from sklearn.linear_model import LinearRegression -from distutils.version import LooseVersion -from functools import reduce as func_reduce - -from qtpy.QtCore import QEventLoop, QPoint, Qt, QThread, QTimer, Signal, Slot -from qtpy.QtGui import (QCloseEvent, QColor, QCursor, QFont, QFontMetricsF, - QIcon, QKeySequence) -from qtpy.QtCore import __version__ as QT_VERSION_STR -from qtpy.QtWidgets import (QAbstractItemView, QAbstractSpinBox, QAction, - QApplication, QBoxLayout, QCheckBox, QComboBox, - QDialog, QDockWidget, QDoubleSpinBox, QFrame, - QGroupBox, QHBoxLayout, QLabel, QLineEdit, - QListWidget, QMenu, QMessageBox, QPushButton, - QSpinBox, QStyle, QStyleOptionSpinBox, QTableWidget, - QTableWidgetItem, QVBoxLayout, QWidget) - -import pyqtgraph as pg -from pyqtgraph import PlotWidget -from caqtwidgets.pvgateway import PVGateway - -class QTaggedLineEdit(QWidget): - def __init__(self, label_text=str(""), value="", - position="LEFT", parent=None): - super(QTaggedLineEdit, self).__init__(parent) - self.parameter = str(value) - self.label = QLabel(label_text) - self.label.setObjectName("Tagged") - self.label.setFixedHeight(24) - self.label.setContentsMargins(10, 0, 0, 0) - #self.label.setFixedWidth(80) - self.line_edit = QLineEdit(self.parameter) - self.line_edit.setObjectName("Write") - self.line_edit.setFixedHeight(24) - font = QFont("sans serif", 16) - fm = QFontMetricsF(font) - self.line_edit.setMaximumWidth(fm.width(self.parameter)+20) - self.label.setBuddy(self.line_edit) - layout = QBoxLayout( - QBoxLayout.LeftToRight if position == "LEFT" else \ - QBoxLayout.TopToBottom) - layout.addWidget(self.label) - layout.addWidget(self.line_edit) - layout.addStretch() - layout.setSpacing(2) - layout.setContentsMargins(0, 0, 0, 0) - self.setLayout(layout) - -class QHLine(QFrame): - def __init__(self): - super(QHLine, self).__init__() - self.setFrameShape(QFrame.HLine) - self.setFrameShadow(QFrame.Sunken) - -class QVLine(QFrame): - def __init__(self): - super(QVLine, self).__init__() - self.setFrameShape(QFrame.VLine) - self.setFrameShadow(QFrame.Sunken) - -class AppQLineEdit(QLineEdit): - def __init__(self, parent=None): - #super().__init__(parent) - pass - def leaveEvent(self, event): - self.clearFocus() - del event - -class CAQLineEdit(QLineEdit, PVGateway): - '''Channel access enabled QLineEdit widget''' - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - trigger_connect = Signal(int, str, int) - - trigger_daq = Signal(object, str, int) - trigger_daq_int = Signal(object, str, int) - trigger_daq_str = Signal(object, str, int) - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - pv_within_daq_group: bool = False, color_mode=None, - show_units: bool = False, prefix: str = "", suffix: str = "", - notify_freq_hz: int = 0, precision: int = 0): - - super().__init__(parent, pv_name, monitor_callback, - pv_within_daq_group, color_mode, show_units, prefix, - suffix, connect_callback=self.py_connect_callback, - notify_freq_hz=notify_freq_hz, precision=precision) - - self.is_initialize_complete() - self.configure_widget() - - if not self.pv_within_daq_group: - self.monitor_start() - - def py_connect_callback(self, handle, pvname, status): - '''Callback function to be invoked on change of pv connection status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - @Slot(object, str, int) - def receive_daq_update(self, daq_pvd, daq_mode, daq_state): - PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state) - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - def configure_widget(self): - self.setFocusPolicy(Qt.NoFocus) - - fm = QFontMetricsF(QFont("Sans Serif", 10)) - qrect = fm.boundingRect(self.suggested_text) - _width_scaling_factor = 1.15 - self.setFixedHeight((fm.lineSpacing()*1.8)) - self.setFixedWidth(((qrect.width()) * _width_scaling_factor)) - - if self.pv_within_daq_group: - self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA) - else: - self.qt_property_initial_values(qt_object_name=self.PV_READBACK) - - #renove highlighting which persists after mouse leaves - def mouseMoveEvent(self, event): - #event.ignore() - pass - - def leaveEvent(self, event): - self.clearFocus() - del event - -class CAQLabel(QLabel, PVGateway): - '''Channel access enabled QLabel widget''' - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - - trigger_connect = Signal(int, str, int) - - trigger_daq = Signal(object, str, int) - trigger_daq_int = Signal(object, str, int) - trigger_daq_str = Signal(object, str, int) - - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - pv_within_daq_group: bool = False, color_mode=None, - show_units: bool = False, prefix: str = "", suffix: str = "", - notify_freq_hz: int = 0, precision: int = 0): - - super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, - color_mode, show_units, prefix, suffix, - connect_callback=self.py_connect_callback, - notify_freq_hz=notify_freq_hz, precision=precision) - - self.is_initialize_complete() - - self.configure_widget() - - if self.pv_within_daq_group is False: - self.monitor_start() - - def py_connect_callback(self, handle, pvname, status): - '''Callback function to be invoked on change of - pv connection status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - @Slot(object, str, int) - def receive_daq_update(self, daq_pvd, daq_mode, daq_state): - PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state) - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - def configure_widget(self): - self.setFocusPolicy(Qt.NoFocus) - - fm = QFontMetricsF(QFont("Sans Serif", 10)) - qrect = fm.boundingRect(self.suggested_text) - _width_scaling_factor = 1.15 - - self.setFixedHeight((fm.lineSpacing()*1.8)) - self.setFixedWidth((qrect.width() * _width_scaling_factor)) - - if self.pv_within_daq_group: - self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA) - else: - self.qt_property_initial_values(qt_object_name=self.PV_READBACK) - -#For use with CAQMenu -class QLineEditExtended(QLineEdit): - def __init__(self, parent=None): - super().__init__(parent) - self.parent = parent - - def mousePressEvent(self, event): - button = event.button() - if button == Qt.RightButton: - self.parent.showContextMenu() - elif button == Qt.LeftButton: - self.parent.mousePressEvent(event) - -class CAQMenu(QComboBox, PVGateway): - '''Channel access enabled QMenu widget''' - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - trigger_connect = Signal(int, str, int) - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - pv_within_daq_group: bool = False, color_mode=None, - show_units=False, prefix: str = "", suffix: str = ""): - - super().__init__(parent, pv_name, monitor_callback, - pv_within_daq_group, color_mode, show_units, prefix, - suffix, connect_callback=self.py_connect_callback) - - self.is_initialize_complete() - - self.configure_widget() - - #After configure:widget - self.currentIndexChanged.connect(self.value_change) - - if self.pv_within_daq_group is False: - self.monitor_start() - - def py_connect_callback(self, handle, pvname, status): - '''Callback function to be invoked on change of - pv connection status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - def configure_widget(self): - - self.previousIndex = None - - self.setFocusPolicy(Qt.NoFocus) - self.setEditable(True) - self.setLineEdit(QLineEditExtended(self)) - self.lineEdit().setReadOnly(True) - self.lineEdit().setAlignment(Qt.AlignCenter) - - enumStringList = self.cafe.getEnumStrings(self.handle) - - self.addItems(enumStringList) - for i in range(0, self.count()): - self.setItemData(i, Qt.AlignCenter, Qt.TextAlignmentRole) - - fm = QFontMetricsF(QFont("Sans Serif", 10)) - qrect = fm.boundingRect(self.suggested_text) - - _width_scaling_factor = 1.1 - - self.setFixedHeight(fm.lineSpacing()*1.8) - self.setFixedWidth((qrect.width()+40) * _width_scaling_factor) - - self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) - - def post_display_value(self, value): - '''Convert value to index''' - if "setCurrentIndex" in dir(self): - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - - if isinstance(value, str): - self.setCurrentIndex(self.cafe.getEnumFromString(self.handle, - value)) - - elif isinstance(value, int): - self.setCurrentIndex(value) - #Should not happen - elif isinstance(value, float): - self.setCurrentIndex(int(value)) - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(False) - - - #self.previousIndex = self.currentIndex() - return - else: - print(("ERROR: overloaded post_display_value: 'setCurrentIndex' " - "method does not exist!")) - - - def value_change(self, indx): - - status = self.cafe.set(self.handle, indx) - - if status != self.cyca.ICAFE_NORMAL: - #self.showSetErrorMsg(status) - - value = self.cafe.getCache(self.handle, 'int') - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - - if value is not None: - self.setCurrentIndex(value) - else: - if self.previousIndex is not None: - self.setCurrentIndex(self.previousIndex) - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(False) - - self.pv_message_in_a_box.setText( - "CAQMenu set operation reports error:\n{0}".format( - self.cafe.getStatusCodeAsString(status))) - self.pv_message_in_a_box.exec() - - def mousePressEvent(self, event): - - button = event.button() - if button == Qt.RightButton: - PVGateway.mousePressEvent(self, event) - - elif self.pv_info is not None: - if self.pv_info.accessWrite == 0: - event.ignore() - return - else: - QComboBox.mousePressEvent(self, event) - - self.previousIndex = self.currentIndex() - - def enterEvent(self, event): - if self.pv_info is not None: - if self.pv_info.accessWrite == 0: - for i in range(0, self.count()): - self.setItemIcon(i, QIcon(":/forbidden.png")) - self.setStyleSheet( - ("QComboBox {background: transparent}" + - "QComboBox::drop-down {image: url(:/forbidden.png)}")) - - def leaveEvent(self, event): - if self.pv_info is not None: - if self.pv_info.accessWrite == 0: - for i in range(0, self.count()): - self.setItemIcon(i, QIcon()) - self.setStyleSheet( - "QComboBox::drop-down {background: transparent}") - - - #The widget should not gain focus by using the mouse wheel. - #This is accomplished by setting the focus policy to Qt.StrongFocus. - #The widget should only accept wheel events if it already has the - #focus. This is accomplished by reimplementing QWidget.wheelEvent - #within a QSpinBox subclass: - def wheelEvent(self, event): - if self.hasFocus() is False: - event.ignore() - else: - QComboBox.wheelEvent(self, event) - - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - '''Triggered by monitor signal''' - - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - -class CAQMessageButton(QPushButton, PVGateway): - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - trigger_connect = Signal(int, str, int) - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - notify_freq_hz: int = 0, - pv_within_daq_group: bool = False, color_mode=None, - show_units=False, msg_label: str = "", - msg_press_value=None, msg_release_value=None, - start_monitor=False): - super().__init__(parent=parent, pv_name=pv_name, - monitor_callback=monitor_callback, - notify_freq_hz=notify_freq_hz, - pv_within_daq_group=pv_within_daq_group, - color_mode=color_mode, show_units=show_units, - msg_label=msg_label, - connect_callback=self.py_connect_callback) - - self.msg_press_value = msg_press_value - self.msg_release_value = msg_release_value - - if self.msg_press_value is not None: - self.pressed.connect(self.act_on_pressed) - if self.msg_release_value is not None: - self.released.connect(self.act_on_released) - - self.msg_label = msg_label - self.suggested_text = self.msg_label - _suggested_text_length = len(self.suggested_text)+3 - self.suggested_text = self.suggested_text.rjust(_suggested_text_length, - "^") - - self.configure_widget() - - self.msg_press_status = self.cyca.ICAFE_NORMAL - self.msg_release_status = self.cyca.ICAFE_NORMAL - self.msg_report_status = "PV={0}\n".format(self.pv_name) - self.msg_has_error = False - - if not self.pv_within_daq_group and start_monitor: - self.monitor_start() - - def py_connect_callback(self, handle, pvname, status): - '''Callback function to be invoked on change of - pv connection status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - def configure_widget(self): - self.setFocusPolicy(Qt.StrongFocus) - self.setCheckable(True) #Recognizes press and release states - - fm = QFontMetricsF(QFont("Sans Serif", 12)) - qrect = fm.boundingRect(self.suggested_text) - - _width_scaling_factor = 1.0 - - self.setText(self.msg_label) - self.setFixedHeight((fm.lineSpacing()*2.0)) - self.setFixedWidth((qrect.width() * _width_scaling_factor)) - - self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) - - - def enterEvent(self, event): - if self.pv_info is not None: - if self.pv_info.accessWrite == 0: - self.setProperty("readOnly", True) - self.qt_style_polish() - - def leaveEvent(self, event): - if self.property("readOnly"): - self.setProperty(self.qt_dynamic_property_get(), True) - self.qt_style_polish() - - def mouseReleaseEvent(self, event): - if self.msg_release_value is not None: - time.sleep(0.1) - QPushButton.mouseReleaseEvent(self, event) - - def mousePressEvent(self, event): - if self.pv_info is not None: - if self.pv_info.accessWrite == 1: - QPushButton.mousePressEvent(self, event) - if event.button() == Qt.RightButton: - PVGateway.mousePressEvent(self, event) - - def act_on_pressed(self): - if self.msg_press_value is not None: - self.msg_press_status = self.cafe.set(self.handle, - self.msg_press_value) - if self.msg_press_status != self.cyca.ICAFE_NORMAL: - self.msg_report_status += ( - "Error in set operation (at press button):\n{0}\n".format( - self.cafe.getStatusCodeAsString(self.msg_press_status))) - self.msg_has_error = True - qm = QMessageBox() - qm.setText(self.msg_report_status) - qm.exec() - QApplication.processEvents() - - def act_on_released(self): - if self.msg_release_value is not None: - self.msg_release_status = self.cafe.set(self.handle, - self.msg_release_value) - if self.msg_release_status != self.cyca.ICAFE_NORMAL: - self.msg_report_status += ( - "Error in set operation (at release button):\n{0}\n".format( - self.cafe.getStatusCodeAsString(self.msg_release_status))) - self.msg_has_error = True - - if self.msg_has_error: - self.msg_has_error = False - self.pv_message_in_a_box.setText(self.msg_report_status) - self.pv_message_in_a_box.exec() - self.msg_report_status = "PV={0}\n".format(self.pv_name) - qm = QMessageBox() - qm.setText(self.msg_report_status) - qm.exec() - QApplication.processEvents() - -class CAQTextEntry(QLineEdit, PVGateway): - '''Channel access enabled QTextEntry widget''' - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - trigger_connect = Signal(int, str, int) - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - pv_within_daq_group: bool = False, color_mode=None, - show_units=False, prefix: str = "", suffix: str = ""): - super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, - color_mode, show_units, prefix, suffix, - connect_callback=self.py_connect_callback) - - self.is_initialize_complete() #waits a fraction of a second - - self.currentText = "" - self.returnPressed.connect(self.valuechange) - self.configure_widget() - if self.pv_within_daq_group is False: - self.monitor_start() - - def py_connect_callback(self, handle, pvname, status): - '''Callback function to be invoked on change of - pv connection status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - def configure_widget(self): - self.setFocusPolicy(Qt.StrongFocus) - - fm = QFontMetricsF(QFont("Sans Serif", 12)) - qrect = fm.boundingRect(self.suggested_text) - - _width_scaling_factor = 1.15 - - self.setFixedHeight((fm.lineSpacing()*1.8)) - self.setFixedWidth(((qrect.width()+10) * _width_scaling_factor)) - - self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) - - def valuechange(self): - status = self.cafe.set(self.handle, self.text()) - if status != self.cyca.ICAFE_NORMAL: - if self.cafe.getNoMonitors(self.handle) == 0: - val = self.cafe.get(self.handle, 'native') - else: - val = self.cafe.getCache(self.handle, 'native') - - if val is not None: - if isinstance(val, str): - strText = val - else: - valStr = ("{: .%sf}" % self.precision) - strText = valStr.format(round(val, self.precision)) - print(strText, " precision ", self.precision) - self.setText(strText) - else: - #Do this for TextInfo cache - if self.cafe.getNoMonitors(self.handle) == 0: - val = self.cafe.get(self.handle, 'native') - - def setText(self, value): - QLineEdit.setText(self, value) - self.currentText = self.text() - - def enterEvent(self, event): - if self.pv_info is not None: - if self.pv_info.accessWrite == 0: - self.setProperty("readOnly", True) - self.qt_style_polish() - self.setReadOnly(True) - self.setFocusPolicy(Qt.StrongFocus) - - def leaveEvent(self, event): - - if self.isReadOnly(): - self.setReadOnly(False) - self.setProperty(self.qt_dynamic_property_get(), True) - self.qt_style_polish() - - if self.text() != self.currentText: - QLineEdit.setText(self, self.currentText) - - self.setCursorPosition(100) - self.clearFocus() - self.setFocusPolicy(Qt.NoFocus) - del event - - def mousePressEvent(self, event): - if event.button() == Qt.RightButton: - PVGateway.mousePressEvent(self, event) - self.clearFocus() - return - local_event_position = QPoint(event.x(), event.y()) - local_cursor_position = self.cursorPositionAt(local_event_position) - self.setCursorPosition(local_cursor_position) - - -class CAQSpinBox(QSpinBox, PVGateway): - '''Channel access enabled QTextEntry widget''' - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - trigger_connect = Signal(int, str, int) - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - pv_within_daq_group: bool = False, color_mode=None, - show_units=False, prefix: str = "", suffix: str = ""): - super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, - color_mode, show_units, prefix, suffix, - connect_callback=self.py_connect_callback) - - self.is_initialize_complete() - - self.valueChanged.connect(self.value_change) - self.configure_widget() - if not self.pv_within_daq_group: - self.monitor_start() - - - def py_connect_callback(self, handle, pvname, status): - ''' - Callback function to be invoked on change of pv connection - status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - def configure_widget(self): - self.previousValue = None - self.currentValue = None - self.setFocusPolicy(Qt.StrongFocus) - self.setButtonSymbols(QAbstractSpinBox.UpDownArrows) - self.setAccelerated(False) - self.setLineEdit(QLineEditExtended(self)) - self.lineEdit().setEnabled(True) - self.lineEdit().setReadOnly(False) - self.lineEdit().setAlignment(Qt.AlignLeft) - self.lineEdit().setFont(QFont("Sans Serif", 16)) - - fm = QFontMetricsF(QFont("Sans Serif", 12)) - - _suggested_text = self.max_control_abs_str - _added_text = "" - - if self.show_units: - _added_text += " " + self.units - _suggested_text += self.units - if self.suffix: - _added_text += " " + self.suffix - _suggested_text += self.suffix - - self.setSuffix(_added_text) - - qrect = fm.boundingRect(_suggested_text) - _width_scaling_factor = 1.0 - - self.setFixedHeight((fm.lineSpacing()*1.8)) - self.setFixedWidth(((qrect.width()) * _width_scaling_factor)) - - self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) - - if self.pv_ctrl is not None: - self.setRange(int(self.pv_ctrl.lowerControlLimit), - int(self.pv_ctrl.upperControlLimit)) - - - def post_display_value(self, value): - '''Convert value to index''' - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - self.setValue(int(round(value))) - self.blockSignals(False) - else: - self.setValue(int(round(value))) - - - def mousePressEvent(self, event): - _opt = QStyleOptionSpinBox() - self.initStyleOption(_opt) - _rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt, - QStyle.SC_SpinBoxUp, self) - _rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt, - QStyle.SC_SpinBoxDown, self) - - self.previousValue = self.value() - - if event.button() == Qt.LeftButton: - if _rect_up.contains(event.pos(), proper=True) or \ - _rect_down.contains(event.pos(), proper=True): - - if not self.cafe.isConnected(self.handle): - self.pv_message_in_a_box.setText( - ("Spinbox change value events currently suspended\n" + - "as channel {0} is disconnected.").format( - self.pv_name)) - self.pv_message_in_a_box.exec() - return - - QSpinBox.mousePressEvent(self, event) - #Clear Focus: only one step per mouse click. - self.clearFocus() - - local_event_position = QPoint(event.x(), event.y()) - local_cursor_position = self.lineEdit().cursorPositionAt( - local_event_position) - - self.lineEdit().setCursorPosition(local_cursor_position) - - PVGateway.mousePressEvent(self, event) - - def setValue(self, intVal): - QSpinBox.setValue(self, intVal) - self.currentValue = self.value() - - def value_change(self, intVal): - - status = self.cafe.set(self.handle, intVal) - if status != self.cyca.ICAFE_NORMAL: - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - - if self.previousValue is not None: - self.setValue(self.previousValue) - else: - _value = self.cafe.getCache(self.handle, 'int') - - if _value is not None: - self.setValue(_value) - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(False) - - self.pv_message_in_a_box.setText( - ("Spinbox set operation reports error:\n{0}" - .format(self.cafe.getStatusCodeAsString(status)))) - self.pv_message_in_a_box.exec() - - else: - if self.previousValue is not None: - self.setValue(self.previousValue) - else: - _value = self.cafe.getCache(self.handle, 'int') - - if _value is not None: - self.setValue(_value) - - self.parent.statusbar.showMessage( - (self.widget_class + " " + - self.cafe.getStatusCodeAsString(status))) - - - def enterEvent(self, event): - if self.pv_info is not None: - if self.pv_info.accessWrite == 0: - self.setProperty("readOnly", True) - self.qt_style_polish() - self.setReadOnly(True) - self.setFocusPolicy(Qt.StrongFocus) - - def leaveEvent(self, event): - if self.isReadOnly(): - self.setReadOnly(False) - self.setProperty(self.qt_dynamic_property_get(), True) - self.qt_style_polish() - - self.clearFocus() - self.setFocusPolicy(Qt.NoFocus) - del event - - - def keyPressEvent(self, event): - if event.key() in (Qt.Key_Return, Qt.Key_Enter): - QSpinBox.keyPressEvent(self, event) - self.clearFocus() - elif event.key() in (Qt.Key_Up, Qt.Key_Down): - QSpinBox.keyPressEvent(self, event) - else: - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - QSpinBox.keyPressEvent(self, event) - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(False) - - - # The spin box should not gain focus by using the mouse wheel. - # This is accomplished by setting the focus policy to Qt.StrongFocus. - # The spin box should only accept wheel events if it already has the focus. - # This is accomplished by reimplementing QWidget.wheelEvent within a - # QSpinBox subclass: - def wheelEvent(self, event): - #print("wheelEvent", self.hasFocus()) - if self.hasFocus() is False: - event.ignore() - else: - QSpinBox.wheelEvent(self, event) - - -class CAQDoubleSpinBox(QDoubleSpinBox, PVGateway): - '''Channel access enabled QDoubleSpinBox widget''' - trigger_monitor_float = Signal(float, int, int) - trigger_monitor_int = Signal(int, int, int) - trigger_monitor_str = Signal(str, int, int) - trigger_monitor = Signal(object, int) - trigger_connect = Signal(int, str, int) - - def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, - pv_within_daq_group: bool = False, color_mode=None, - show_units: bool = False, prefix: str = "", suffix: str = ""): - super().__init__(parent=parent, pv_name=pv_name, - monitor_callback=monitor_callback, - pv_within_daq_group=pv_within_daq_group, - color_mode=color_mode, show_units=show_units, - prefix=prefix, suffix=suffix, - connect_callback=self.py_connect_callback) - - self.is_initialize_complete() - self.valueChanged.connect(self.valuechange) - self.configure_widget() - - if self.pv_within_daq_group is False: - self.monitor_start() - - - def py_connect_callback(self, handle, pvname, status): - ''' - Callback function to be invoked on change of pv connection - status. - ''' - self.trigger_connect.emit(int(handle), str(pvname), int(status)) - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - PVGateway.receive_monitor_update(self, value, status, alarm_severity) - - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - PVGateway.receive_connect_update(self, handle, pv_name, status) - - def configure_widget(self): - self.previousValue = None - self.currentValue = None - self.setFocusPolicy(Qt.StrongFocus) - self.setButtonSymbols(QAbstractSpinBox.UpDownArrows) - self.setAccelerated(False) - self.setLineEdit(QLineEditExtended(self)) - self.lineEdit().setReadOnly(False) - self.lineEdit().setAlignment(Qt.AlignRight) - self.lineEdit().setFont(QFont("Sans Serif", 12)) - - _stepsize = 10**(self.precision * -1) - self.setSingleStep(_stepsize) - self.setDecimals(self.precision) - - fm = QFontMetricsF(QFont("Sans Serif", 12)) - - _suggested_text = self.suggested_text - _added_text = "" - - if self.show_units: - _added_text += " " + self.units - _suggested_text += self.units - if self.suffix: - _added_text += " " + self.suffix - _suggested_text += self.suffix - - self.setSuffix(_added_text) - - qrect = fm.boundingRect(_suggested_text) - - _width_scaling_factor = 1.15 - - self.setFixedHeight((fm.lineSpacing()*1.8)) - self.setFixedWidth(((qrect.width()) * _width_scaling_factor)) - - self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) - - if self.pv_ctrl is not None: - self.setRange(int(self.pv_ctrl.lowerControlLimit), - int(self.pv_ctrl.upperControlLimit)) - - - def post_display_value(self, value): - '''set value from monitor''' - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - self.setValue(value) - self.blockSignals(False) - else: - self.setValue(value) - - def mousePressEvent(self, event): - - _opt = QStyleOptionSpinBox() - self.initStyleOption(_opt) - _rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt, - QStyle.SC_SpinBoxUp, self) - _rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt, - QStyle.SC_SpinBoxDown, self) - self.previousValue = self.value() - - if event.button() == Qt.LeftButton: - if _rect_up.contains(event.pos(), proper=False) or \ - _rect_down.contains(event.pos(), proper=False): - - if not self.cafe.isConnected(self.handle): - self.pv_message_in_a_box.setText( - ("Spinbox change value events currently suspended\n" + - "as channel {0} is disconnected.").format( - self.pv_name)) - self.pv_message_in_a_box.exec() - return - - QDoubleSpinBox.mousePressEvent(self, event) - - local_event_position = QPoint(event.x(), event.y()) - local_cursor_position = self.lineEdit().cursorPositionAt( - local_event_position) - - self.lineEdit().setCursorPosition(local_cursor_position) - - PVGateway.mousePressEvent(self, event) - - def mouseReleaseEvent(self, event): - self.clearFocus() - - def setValue(self, value): - self.currentValue = self.value() - QDoubleSpinBox.setValue(self, value) - - def valuechange(self, fval): - status = self.cafe.set(self.handle, fval) - - if status != self.cyca.ICAFE_NORMAL: - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - if self.previousValue is not None: - self.setValue(self.previousValue) - else: - _value = self.cafe.getCache(self.handle, 'float') - - if _value is not None: - self.setValue(_value) - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(False) - - self.pv_message_in_a_box.setText( - ("Spinbox set operation reports error:\n{0}" - .format(self.cafe.getStatusCodeAsString(status)))) - self.pv_message_in_a_box.exec() - - else: - if self.previousValue is not None: - self.setValue(self.previousValue) - else: - _value = self.cafe.getCache(self.handle, 'float') - - if _value is not None: - self.setValue(_value) - - self.parent.statusbar.showMessage( - (self.widget_class + " " + - self.cafe.getStatusCodeAsString(status))) - - - def enterEvent(self, event): - self.setFocusPolicy(Qt.StrongFocus) - if self.pv_info is not None: - if self.pv_info.accessWrite == 0: - self.setProperty("readOnly", True) - self.qt_style_polish() - self.setReadOnly(True) - - def leaveEvent(self, event): - if self.isReadOnly(): - self.setReadOnly(False) - self.setProperty(self.qt_dynamic_property_get(), True) - self.qt_style_polish() - - self.clearFocus() - self.setFocusPolicy(Qt.NoFocus) - del event - - def keyPressEvent(self, event): - - if event.key() in (Qt.Key_Return, Qt.Key_Enter): - QDoubleSpinBox.keyPressEvent(self, event) - self.clearFocus() - elif event.key() in (Qt.Key_Up, Qt.Key_Down): - QDoubleSpinBox.keyPressEvent(self, event) - else: - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(True) - QDoubleSpinBox.keyPressEvent(self, event) - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.blockSignals(False) - - # The spin box should not gain focus by using the mouse wheel. - # This is accomplished by setting the focus policy to Qt.StrongFocus. - # The spin box should only accept wheel events if it already has the focus. - # This is accomplished by reimplementing QWidget.wheelEvent within a - # QSpinBox subclass: - def wheelEvent(self, event): - if self.hasFocus() is False: - event.ignore() - else: - QDoubleSpinBox.wheelEvent(self, event) - - -class reconnectQPushButton(QPushButton, QThread): - def __init__(self, parent=None): - super().__init__() - self.parent = parent - self.clicked.connect(self.onClicked) - self.isdirty = False - self._handles_to_reconnect = [] - self.reconnectThread = None - - def onClicked(self, event): - - self._handles_to_reconnect = [] - - for i in range(0, len(self.parent.pv_gateway)): - if self.parent.item( - i, self.parent.no_columns-1).checkState() == Qt.Checked: - self._handles_to_reconnect.append( - self.parent.pv_gateway[i].handle) - - self.reconnect() - QApplication.processEvents() - - def reconnect(self): - QApplication.processEvents() - - self.isdirty = True - if self._handles_to_reconnect: - self.parent.cafe.reconnect(self._handles_to_reconnect) - self.isdirty = False - #Uncheck reconnected channels - for i in range(0, len(self.parent.pv_gateway)): - if self.parent.item( - i, self.parent.no_columns-1).checkState() == Qt.Checked: - if self.parent.cafe.isConnected( - self.parent.pv_gateway[i].handle): - self.parent.item( - i, self.parent.no_columns-1).setCheckState(False) - - #Uncheck global reconnect check box - self.parent.cb_item_all.setCheckState(Qt.Unchecked) - - -class CAQTableWidget(QTableWidget): - '''Channel access enabled QTableWidget widget''' - #trigger_monitor_float = Signal(float, int, int) - #trigger_monitor_int = Signal(int, int, int) - #trigger_monitor_str = Signal(str, int, int) - #trigger_connect = Signal(int, str, int) - - def hasNewData(self, _row, pv_data): - - if self.pv_gateway[_row].pvd_previous is None: - return True - - newDataFlag = False - - if self.pv_gateway[_row].pvd_previous.ts[1] != pv_data.ts[1]: - newDataFlag = True - elif self.pv_gateway[_row].pvd_previous.ts[0] != pv_data.ts[0]: - newDataFlag = True - # Catch disconnect events(!!) and set newDataFlag only - elif self.pv_gateway[_row].pvd_previous.status != pv_data.status: - newDataFlag = True - return newDataFlag - - - def paint_rows(self, row_range: list = [], reset=False, last_row=[" ", " "], - columns=[0]): - - _qcolor_last_line = QColor("#d1e8e9") - self.font_pts11 = QTableWidgetItem().font() - self.font_pts11.setPixelSize(11) - if reset: - _qcolor = self.item(0, self.columnCount()-1).background() - _start = 0 - _end = self.rowCount()-1 - else: - _qcolor = _qcolor_last_line - _start = row_range[0] - _end = row_range[1] - - for _row in range(_start, _end): - _cell = QTableWidgetItem("{0}".format(_row+1)) - if not reset: - _cell.setFont(self.font_pts11) - _cell.setBackground(_qcolor) - - if 1 in columns: - self.item(_row, 0).setBackground(_qcolor) - self.item(_row, 0).setFont(self.font_pts11) - if 0 in columns: - self.setVerticalHeaderItem(_row, _cell) - - - #last row - - if reset and 0 in columns: - _cell = QTableWidgetItem("{0}".format(last_row[0])) - _cell.setFont(self.font_pts11) - self.setVerticalHeaderItem(self.rowCount()-1, _cell) - - self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter) - self.item(self.rowCount()-1, 0).setText(str(last_row[1])) - self.item(self.rowCount()-1, 0).setBackground(_qcolor) - self.item(self.rowCount()-1, 0).setFont(self.font_pts11) - elif last_row[0] != " ": - _cell = QTableWidgetItem("{0}".format(last_row[0])) - _cell.setBackground(_qcolor_last_line) - _cell.setFont(self.font_pts11) - self.setVerticalHeaderItem(self.rowCount()-1, _cell) - - if columns: - self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter) - self.item(self.rowCount()-1, 0).setText(str(last_row[1])) - self.item(self.rowCount()-1, 0).setBackground(_qcolor_last_line) - self.item(self.rowCount()-1, 0).setFont(self.font_pts11) - - - def widget_update(self): - - for _row, pvgate in enumerate(self.pv_gateway): - #for _row in range(0, len(self.pv_gateway)): - if not pvgate.notify_unison: - continue - _handle = pvgate.handle - _pvd = pvgate.cafe.getPVCache(_handle) - - if _pvd.status in (self.cyca.ICAFE_CS_NEVER_CONN, - self.cyca.ICAFE_CA_OP_CONN_DOWN): - pvgate.pvd_previous = _pvd - continue - - pvgate.pvd_previous = _pvd - - #if timestamps the same - then skip - _value = _pvd.value[0] - _value = pvgate.format_display_value(_value) - - qtwi = QTableWidgetItem(str(_value)+ " ") - f = qtwi.font() - f.setPointSize(8) - qtwi.setFont(f) - - self.setItem(_row, self.no_columns-3, - QTableWidgetItem(qtwi)) - self.item(_row, self.no_columns-3).setTextAlignment(Qt.AlignRight | - Qt.AlignVCenter) - - _ts_date = _pvd.tsDateAsString - _ts_str_len = len(_ts_date) - _ilength_target = self.format_ts_nano - - while _ts_str_len < _ilength_target: - _ts_date += "0" - _ilength_target = _ilength_target - 1 - _ts_str_len = len(_ts_date) - _ts_str = _ts_date[0: _ts_str_len - (self.format_ts_nano - - self.format_ts_decimal_part)] - _ts_str_len = len(_ts_str) - _ilength_target = self.format_ts_decimal_part - if self.format_ts_decimal_part == self.format_ts_deci: - if _ts_str_len == self.format_ts_sec: - _ts_str += "." - while _ts_str_len < _ilength_target: - _ts_str += "0" - _ilength_target = _ilength_target -1 - - qtwi = QTableWidgetItem(_ts_str) - f = qtwi.font() - f.setPointSize(8) - qtwi.setFont(f) - - self.setItem(_row, self.no_columns-2, QTableWidgetItem(qtwi)) - self.item(_row, self.no_columns-2).setTextAlignment(Qt.AlignCenter) - - _prop = pvgate.qt_dynamic_property_get() - - alarm_severity = _pvd.alarmSeverity - - if _prop == pvgate.READBACK_ALARM: - - if alarm_severity == pvgate.cyca.SEV_MAJOR: - _bgcolor = pvgate.fg_alarm_major - _fgcolor = "black" - elif alarm_severity == pvgate.cyca.SEV_MINOR: - _bgcolor = pvgate.fg_alarm_minor - _fgcolor = "black" - elif alarm_severity == pvgate.cyca.SEV_INVALID: - _bgcolor = pvgate.fg_alarm_invalid - _fgcolor = "#777777" - else: - _bgcolor = pvgate.fg_alarm_noalarm - _fgcolor = "black" - - #Colors for bg/fg reversed as is the old norm - self.item(_row, self.no_columns-3).setBackground( - QColor(_bgcolor)) - self.item(_row, self.no_columns-2).setBackground( - QColor(_bgcolor)) - self.item(_row, self.no_columns-3).setForeground( - QColor(_fgcolor)) - self.item(_row, self.no_columns-2).setForeground( - QColor(_fgcolor)) - - elif _prop == pvgate.READBACK_STATIC: - - self.item(_row, self.no_columns-3).setBackground( - QColor(pvgate.bg_readback)) - self.item(_row, self.no_columns-2).setBackground( - QColor(pvgate.bg_readback)) - - elif _prop == pvgate.DISCONNECTED: - self.item(_row, self.no_columns-3).setBackground( - QColor("#ffffff")) - self.item(_row, self.no_columns-2).setBackground( - QColor("#ffffff")) - self.item(_row, self.no_columns-3).setForeground( - QColor("#777777")) - self.item(_row, self.no_columns-2).setForeground( - QColor("#777777")) - - else: - print(_prop, "widget_update unknown in element/row", _row, - _row+1) - - QApplication.processEvents() - - def __init__(self, parent=None, pv_list: list = ["PV_NAME_NOT_GIVEN"], - monitor_callback=None, pv_within_daq_group: bool = False, - color_mode=None, show_units: bool = True, prefix: str = "", - suffix: str = "", ts_res: str = "milli", - init_column: bool = False, init_list: list = [], - notify_freq_hz: int = 0, notify_unison: bool = True, - precision: int = 0, scale_factor: float = 1, - show_timestamp: bool = True, pv_list_show: list = None): - - super().__init__() - self.columns_dict = {} - _column_dict_value = 0 - self.columns_dict['PV'] = _column_dict_value - if init_column: - _column_dict_value += 1 - self.columns_dict['Init'] = _column_dict_value - _column_dict_value += 1 - self.columns_dict['Value'] = _column_dict_value - if show_timestamp: - _column_dict_value += 1 - self.columns_dict['Timestamp'] = _column_dict_value - _column_dict_value += 1 - self.columns_dict['Reconnect'] = _column_dict_value - - self.setWindowModality(Qt.ApplicationModal) - self.no_columns = _column_dict_value + 1 - - self.init_column = init_column - - self.init_list = init_list - if self.init_column and not self.init_list: - self.init_list = pv_list - - self.icount = 0 - self.notify_freq_hz = abs(notify_freq_hz) - self.notify_freq_hz_default = self.notify_freq_hz - self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \ - 1000 / self.notify_freq_hz - - self.notify_unison = bool(notify_unison) and bool(self.notify_freq_hz) - - self.precision = precision - self.scale_factor = scale_factor - self.show_timestamp = show_timestamp - - self.format_ts_nano = 31 #max length of date - self.format_ts_micro = 28 - self.format_ts_milli = 25 - self.format_ts_deci = 23 #-8 - self.format_ts_sec = 21 - if "nano" in ts_res.lower(): - self.format_ts_decimal_part = self.format_ts_nano - elif "micro" in ts_res.lower(): - self.format_ts_decimal_part = self.format_ts_micro - elif "milli" in ts_res.lower(): - self.format_ts_decimal_part = self.format_ts_milli - elif "deci" in ts_res.lower(): - self.format_ts_decimal_part = self.format_ts_deci - elif "sec" in ts_res.lower(): - self.format_ts_decimal_part = self.format_ts_sec - else: - self.format_ts_decimal_part = self.format_ts_milli - - self.pv2item_dict = {} - - self.pv_list = pv_list - self.pv_gateway = [None] * len(self.pv_list) - - self.pv_list_show = pv_list_show - if self.pv_list_show is None: - self.pv_list_show = self.pv_list - - _color_mode = [None] * len(self.pv_list) - - if isinstance(color_mode, list): - for i in range(0, len(color_mode)): - _color_mode[i] = color_mode[i] - - for i in range(0, len(self.pv_list)): - - self.pv_gateway[i] = PVGateway( - parent, self.pv_list[i], monitor_callback, - pv_within_daq_group, _color_mode[i], show_units, prefix, suffix, - connect_triggers=False, notify_freq_hz=self.notify_freq_hz, - notify_unison=self.notify_unison, precision=self.precision) - - self.pv_gateway[i].is_initialize_complete() - self.pv_gateway[i].trigger_connect.connect( - self.receive_connect_update) - self.pv_gateway[i].trigger_monitor_str.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor_int.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor_float.connect( - self.receive_monitor_update) - - self.pv_gateway[i].widget_class = "QTableWidgetItem" - - self.pv_gateway[i].qt_property_initial_values( - qt_object_name=self.pv_gateway[i].PV_READBACK, tool_tip=False) - - #required for reconnect - self.cafe = self.pv_gateway[0].cafe - self.cyca = self.pv_gateway[0].cyca - - self.timer = None - if self.notify_unison: - self.timer = QTimer() - self.timer.timeout.connect(self.widget_update) - self.timer.singleShot(0, self.widget_update) - self.timer.start(self.notify_milliseconds) - - self.configure_widget() - - #Connect only deals with colours - only helps on reconnect - # In any case monitors take over - #Got to do this earlier or emit immediately after!! - for i in range(0, len(self.pv_gateway)): - if self.cafe.isConnected(self.pv_gateway[i].pv_name): - self.pv_gateway[i].trigger_connect.emit( - self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name), - self.pv_gateway[i].cyca.ICAFE_CS_CONN) - - for i in range(0, len(self.pv_gateway)): - if not self.pv_gateway[i].pv_within_daq_group: - self.pv_gateway[i].monitor_start() - - self.update_init_values() - - self.configure_context_menu() - - - def configure_context_menu(self): - self.table_context_menu = QMenu() - self.table_context_menu.setObjectName("contextMenu") - self.table_context_menu.setWindowModality(Qt.NonModal) - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.table_context_menu.addSection("---") - - action1 = QAction("Configure Table PVs", self) - action1.triggered.connect(self.display_table_parameters) - self.table_context_menu.addAction(action1) - - if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): - self.table_context_menu.addSection("---") - - QApplication.processEvents() - - - def restore_init_values(self, pv_list: list = []): - _set_values_dict = self.get_init_values() - - if not pv_list: - _pvs_to_set, _values_to_set = zip(*_set_values_dict.items()) - #zip returns tuples - _pvs_to_set = list(_pvs_to_set) - _values_to_set = list(_values_to_set) - else: - _pvs_to_set = [] - _values_to_set = [] - for pv in pv_list: - if pv in _set_values_dict.keys(): - _pvs_to_set.append(pv) - _values_to_set.append(_set_values_dict[pv]) - - status, status_list = self.cafe.setScalarList(_pvs_to_set, - _values_to_set) - - if status != self.cyca.ICAFE_NORMAL: - _mess = ("The following device(s) reported an error " + - "in 'set' operation:") - for i, status_value in enumerate(status_list): - if status_value != self.cyca.ICAFE_NORMAL: - _mess += ("\n" + _pvs_to_set[i] + " has status = " + - str(status_value) + " " + - self.cafe.getStatusCodeAsString(status_value) + - " " + self.cafe.getStatusInfo(status_value)) - qm = QMessageBox() - qm.setText(_mess) - - qm.exec() - QApplication.processEvents() - - self.init_value_button.setEnabled(True) - - - def is_same_as_init_values(self): - _init_values_dict = self.get_column_values(self.columns_dict['Init']) - _pvs, _init_values = zip(*_init_values_dict.items()) - _current_values_dict = self.get_column_values( - self.columns_dict['Value']) - _pvs, _current_values = zip(*_current_values_dict.items()) - #zip returns tuples - - return bool(func_reduce(lambda i, j: i and j, map( - lambda m, k: m == k, _init_values, _current_values), True)) - - #if func_reduce(lambda i, j: i and j, map( - # lambda m, k: m == k, _init_values, _current_values), True): - # return True - #else: - # return False - - - def get_column_values(self, column_no): - _values_dict = {} - _start = 0 - _end = len(self.pv_gateway) - _pvs = [None] * _end - _values_str = [None] * _end - _values = [None] * _end - - for _row in range(_start, _end): - _values_str[_row] = self.item(_row, column_no).text() - _pvs[_row] = self.item(_row, 0).text() - - _value_list = [float(_value_list) for _value_list in re.findall( - r'-?\d+\.?\d*', _values_str[_row])] - - if not _value_list: - print("row", _row, "values", _values_str[_row], _pvs[_row]) - _values[_row] = _values_str[_row] #Can be enum string - else: - _values[_row] = _value_list[0] - - if _pvs[_row] in self.pv_list_show: - _values_dict[self.pv_gateway[_row].pv_name] = _values[_row] - - return _values_dict #_pvs_to_set, _values_to_set - - - def get_init_values(self): - return self.get_column_values(self.columns_dict['Init']) - - def get_init_values_previous(self): - _set_values_dict = {} - _start = 0 - _end = len(self.pv_gateway) - _pvs_to_set = [None] * _end - _values_to_set_str = [None] * _end - _values_to_set = [None] * _end - for _row in range(_start, _end): - _values_to_set_str[_row] = self.item( - _row, self.columns_dict['Init']).text() - _pvs_to_set[_row] = self.item(_row, self.columns_dict['PV']).text() - - _value_list = [float(_value_list) for _value_list in re.findall( - r'-?\d+\.?\d*', _values_to_set_str[_row])] - - if not _value_list: - print("//row", _row, "values", _values_to_set_str[_row], - _pvs_to_set[_row]) - _values_to_set[_row] = _values_to_set_str[_row] #Can be enum str - else: - _values_to_set[_row] = _value_list[0] - - - if _pvs_to_set[_row] in self.init_list: - _set_values_dict[ - self.pv_gateway[_row].pv_name] = _values_to_set[_row] - - return _set_values_dict - - - def update_init_values(self): - _start = 0 - _end = len(self.pv_gateway) - - for _row in range(_start, _end): - _handle = self.pv_gateway[_row].handle - _value = self.pv_gateway[_row].cafe.getCache(_handle) - - if _value is not None: - if self.scale_factor != 1: - _value = _value * self.scale_factor - _value = self.pv_gateway[_row].format_display_value(_value) - - qtwi = QTableWidgetItem(str(_value)+ " ") - _f = qtwi.font() - _f.setPointSize(8) - qtwi.setFont(_f) - self.setItem(_row, 1, qtwi) - self.item(_row, 1).setTextAlignment(Qt.AlignRight | Qt.AlignVCenter) - - - def configure_widget(self): - - _column_width_pvname = 180 - _column_width_value = 90 - _column_width_timestamp = 210 - _column_width_checkbox = 22 - - self.setRowCount(len(self.pv_gateway)+1) - self.setColumnCount(self.no_columns) - self.setEditTriggers(QAbstractItemView.NoEditTriggers) - self.resizeColumnsToContents() - self.resizeRowsToContents() - #self.horizontalHeader().setStretchLastSection(True); - self.setColumnWidth(self.columns_dict['PV'], _column_width_pvname) - - self.setColumnWidth(self.columns_dict['Value'], _column_width_value) - if 'Init' in self.columns_dict.keys(): - self.setColumnWidth(self.columns_dict['Init'], _column_width_value) - if 'Timestamp' in self.columns_dict.keys(): - self.setColumnWidth(self.columns_dict['Timestamp'], - _column_width_timestamp) - self.setColumnWidth(self.columns_dict['Reconnect'], - _column_width_checkbox) - - _pv_column = self.columns_dict['PV'] - - for i in range(0, len(self.pv_gateway)): - qtwt = QTableWidgetItem(self.pv_list_show[i]) - f = qtwt.font() - f.setPointSize(8) - qtwt.setFont(f) - - self.setItem(i, _pv_column, qtwt) - self.item(i, _pv_column).setTextAlignment(Qt.AlignHCenter | - Qt.AlignVCenter) - for i_column in range(1, self.no_columns-1): - self.setItem(i, i_column, QTableWidgetItem(str(""))) - self.item(i, i_column).setTextAlignment(Qt.AlignHCenter | - Qt.AlignVCenter) - self.pv2item_dict[self.pv_gateway[i]] = i - - cb_item = QTableWidgetItem() - cb_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) - cb_item.setCheckState(Qt.Unchecked) - cb_item.setTextAlignment(Qt.AlignCenter) - cb_item.setToolTip(self.pv_gateway[i].pv_name) - - self.setItem(i, self.no_columns-1, cb_item) - self.item(i, self.no_columns-1).setTextAlignment(Qt.AlignCenter) - - if self.init_column: - self.init_widget = QWidget() - _init_layout = QHBoxLayout(self.init_widget) - self.init_value_button = QPushButton() - self.init_value_button.setText("Update") - _f = self.init_value_button.font() - _f.setPointSize(8) - self.init_value_button.setFont(_f) - self.init_value_button.setFixedWidth(80) - self.init_value_button.clicked.connect(self.update_init_values) - self.init_value_button.setToolTip( - ("Stores initial, pre-measurement value. Update is also " + - "typically executed automatically before new optics are set.")) - _init_layout.addWidget(self.init_value_button) - _init_layout.setAlignment(Qt.AlignRight) - _init_layout.setContentsMargins(1, 1, 0, 0) #Required - self.init_widget.setLayout(_init_layout) - self.setCellWidget(len(self.pv_gateway), 1, self.init_widget) - - _restore_widget = QWidget() - _restore_layout = QHBoxLayout(_restore_widget) - self.restore_value_button = QPushButton() - self.restore_value_button.setStyleSheet( - "QPushButton{background-color: rgb(212, 219, 157);}") - self.restore_value_button.setText("Restore") - _f = self.restore_value_button.font() - _f.setPointSize(8) - self.restore_value_button.setFont(_f) - self.restore_value_button.setFixedWidth(80) - self.restore_value_button.clicked.connect(self.restore_init_values) - self.restore_value_button.setToolTip( - ("Restore devices to their pre-measurement values")) - _restore_layout.addWidget(self.restore_value_button) - _restore_layout.setAlignment(Qt.AlignRight) - _restore_layout.setContentsMargins(1, 1, 0, 0) - _restore_widget.setLayout(_restore_layout) - self.setCellWidget(len(self.pv_gateway), 2, _restore_widget) - - #Do not display no for last row (Reconnect button) - _row_digit_last_cell = QTableWidgetItem(str("")) - self.setVerticalHeaderItem(len(self.pv_gateway), _row_digit_last_cell) - self.setItem(len(self.pv_gateway), 0, QTableWidgetItem(str(""))) - - _qwb = QWidget() - - self.reconnect_button = reconnectQPushButton(self) #self required - - f = self.reconnect_button.font() - - if 'Timestamp' in self.columns_dict.keys(): - f.setPointSize(8) - self.reconnect_button.setFixedWidth(100) - else: - f.setPointSize(6) - self.reconnect_button.setFixedWidth(58) - - self.reconnect_button.setFont(f) - - self.reconnect_button.setText("Reconnect") - - _layout = QHBoxLayout(_qwb) - _layout.addWidget(self.reconnect_button) - _layout.setAlignment(Qt.AlignCenter) - _layout.setContentsMargins(0, 0, 0, 0) #Required - - #_reconnect_button - self.setCellWidget(len(self.pv_gateway), self.no_columns-2, _qwb) - - self.cb_item_all = QCheckBox() - self.cb_item_all.setCheckState(Qt.Unchecked) - self.cb_item_all.stateChanged.connect(self.reconnectStateChanged) - self.cb_item_all.setObjectName("Reconnect") - - self.setCellWidget(len(self.pv_gateway), self.no_columns-1, - self.cb_item_all) - - header_item = QTableWidgetItem("Process Variable") - - self.setHorizontalHeaderItem(self.columns_dict['PV'], header_item) - - if 'Init' in self.columns_dict.keys(): - self.setHorizontalHeaderItem(self.columns_dict['Init'], - QTableWidgetItem("Initial Value")) - - self.setHorizontalHeaderItem(self.columns_dict['Value'], - QTableWidgetItem("Value")) - - if 'Timestamp' in self.columns_dict.keys(): - self.setHorizontalHeaderItem(self.columns_dict['Timestamp'], - QTableWidgetItem("Timestamp")) - self.setHorizontalHeaderItem(self.columns_dict['Reconnect'], - QTableWidgetItem("R")) - self.setFocusPolicy(Qt.NoFocus) - self.setEditTriggers(QAbstractItemView.NoEditTriggers) - self.setSelectionMode(QAbstractItemView.NoSelection) - - self.verticalHeader().setDefaultAlignment(Qt.AlignRight) - self.verticalHeader().setFixedWidth(22) - - _fm_font = QFont("Sans Serif") - _fm_font.setPointSize(12) - fm = QFontMetricsF(_fm_font) - - _factor = 1 - if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"): - _factor = 1.18 - - self.setFixedHeight( - int(fm.lineSpacing() * _factor * (len(self.pv_gateway)+3))) - _min_table_width = 620 if not self.init_column else 650 - self.setMinimumWidth(_min_table_width) - - for _row in range(0, len(self.pv_gateway)): - self.item(_row, _pv_column).setForeground(QColor("#000000")) - - for i_column in range(1, self.no_columns-2): - self.item(_row, i_column).setForeground(QColor("#000000")) - self.item(_row, i_column).setTextAlignment(Qt.AlignRight | - Qt.AlignVCenter) - - self.item(_row, self.columns_dict['Value']).setBackground( - QColor("#ffffff")) - if 'Timestamp' in self.columns_dict.keys(): - self.item(_row, - self.columns_dict['Timestamp']).setTextAlignment( - Qt.AlignCenter) - self.item(_row, - self.columns_dict['Timestamp']).setBackground( - QColor("#ffffff")) - - @Slot(int) - def reconnectStateChanged(self, state): - if state == Qt.Unchecked: - for i in range(0, len(self.pv_gateway)): - self.item(i, self.columns_dict['Reconnect']).setCheckState( - Qt.Unchecked) - else: - for i in range(0, len(self.pv_gateway)): - self.item(i, self.columns_dict['Reconnect']).setCheckState( - Qt.Checked) - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - - _row = self.pv2item_dict[self.sender()] - self.pv_gateway[_row].time_monotonic = time.monotonic() - if self.scale_factor != 1: - value = value * self.scale_factor - _value = self.pv_gateway[_row].format_display_value(value) - - qtwi = QTableWidgetItem(str(_value) + " ") - f = qtwi.font() - f.setPointSize(8) - qtwi.setFont(f) - self.setItem(_row, self.columns_dict['Value'], qtwi) - self.item(_row, self.columns_dict['Value']).setTextAlignment( - Qt.AlignRight | Qt.AlignVCenter) - - if 'Timestamp' in self.columns_dict.keys(): - _handle = self.pv_gateway[_row].handle - _pvd = self.pv_gateway[_row].cafe.getPVCache(_handle) - _ts_date = _pvd.tsDateAsString - _ts_str_len = len(_ts_date) - _ilength_target = self.format_ts_nano - - while _ts_str_len < _ilength_target: - _ts_date += "0" - _ilength_target = _ilength_target -1 - - ##ts_str_len = len(_ts_date) - _ts_str = _ts_date[0: _ts_str_len-( - self.format_ts_nano-self.format_ts_decimal_part)] - _ts_str_len = len(_ts_str) - - _ilength_target = self.format_ts_decimal_part - if self.format_ts_decimal_part == self.format_ts_deci: - if _ts_str_len == self.format_ts_sec: - _ts_str += "." - while _ts_str_len < _ilength_target: - _ts_str += "0" - _ilength_target = _ilength_target -1 - - qtwi = QTableWidgetItem(_ts_str) - f = qtwi.font() - f.setPointSize(8) - qtwi.setFont(f) - - self.setItem(_row, self.columns_dict['Timestamp'], qtwi) - self.item(_row, self.columns_dict['Timestamp']).setTextAlignment( - Qt.AlignCenter) - - _prop = self.pv_gateway[_row].qt_dynamic_property_get() - - if _prop == self.pv_gateway[_row].READBACK_ALARM: - - if alarm_severity == self.pv_gateway[_row].cyca.SEV_MAJOR: - _bgcolor = self.pv_gateway[_row].settings.fgAlarmMajor - _fgcolor = "black" - elif alarm_severity == self.pv_gateway[_row].cyca.SEV_MINOR: - _bgcolor = self.pv_gateway[_row].settings.fgAlarmMinor - _fgcolor = "black" - elif alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID: - _bgcolor = self.pv_gateway[_row].settings.fgAlarmInvalid - _fgcolor = "#777777" - else: - _bgcolor = self.pv_gateway[_row].settings.fgAlarmNoAlarm - _fgcolor = "black" - - #Colors for bg/fg reversed as is the old norm - self.item(_row, self.columns_dict['Value']).setBackground( - QColor(_bgcolor)) - self.item(_row, self.columns_dict['Value']).setForeground( - QColor(_fgcolor)) - if 'Timestamp' in self.columns_dict.keys(): - self.item(_row, self.columns_dict['Timestamp']).setBackground( - QColor(_bgcolor)) - self.item(_row, self.columns_dict['Timestamp']).setForeground( - QColor(_fgcolor)) - - - elif _prop == self.pv_gateway[_row].DISCONNECTED or \ - alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID: - self.item(_row, self.columns_dict['Value']).setBackground( - QColor("#ffffff")) - self.item(_row, self.columns_dict['Value']).setForeground( - QColor("#777777")) - - if 'Timestamp' in self.columns_dict.keys(): - self.item(_row, self.columns_dict['Timestamp']).setBackground( - QColor("#ffffff")) - self.item(_row, self.columns_dict['Timestamp']).setForeground( - QColor("#777777")) - - - elif _prop == self.pv_gateway[_row].READBACK_STATIC: - self.item(_row, self.columns_dict['Value']).setBackground( - QColor(self.pv_gateway[_row].bg_readback)) - if 'Timestamp' in self.columns_dict.keys(): - self.item(_row, self.columns_dict['Timestamp']).setBackground( - QColor(self.pv_gateway[_row].bg_readback)) - else: - - print(_prop, self.pv_gateway[_row].DISCONNECTED, - "(in monitor) unknown in element/row no.", _row, _row+1) - - QApplication.processEvents(QEventLoop.AllEvents, 10) - - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - _row = self.pv2item_dict[self.sender()] - - self.pv_gateway[_row].receive_connect_update(handle, pv_name, status, - post_display=False) - - _prop = self.pv_gateway[_row].qt_dynamic_property_get() - - #self.post_display_value(status) - if _prop == self.pv_gateway[_row].DISCONNECTED: - self.item(_row, self.columns_dict['Value']).setBackground( - QColor("#ffffff")) - self.item(_row, self.columns_dict['Value']).setForeground( - QColor("#777777")) - if 'Timestamp' in self.columns_dict.keys(): - self.item(_row, self.columns_dict['Timestamp']).setBackground( - QColor("#ffffff")) - self.item(_row, self.columns_dict['Timestamp']).setForeground( - QColor("#777777")) - - QApplication.processEvents() - - def table_precision_user_changed(self, new_value): - self.pvgateway_precision = new_value - - for pvgate in self.pv_gateway: - if pvgate.pv_ctrl is not None: - self.pvgateway_precision = min(pvgate.pv_ctrl.precision, - new_value) - - pvgate.precision_user = self.pvgateway_precision - pvgate.precision = self.pvgateway_precision - - _pvd = self.cafe.getPVCache(pvgate.handle) - - if _pvd.value[0] is not None: - if isinstance(_pvd.value[0], float): - pvgate.trigger_monitor_float.emit( - _pvd.value[0], _pvd.status, _pvd.alarmSeverity) - - - def table_precision_ioc_reset(self): - if self.max_precision_value == self.table_precision_user_wgt.value(): - self.table_precision_user_changed(self.max_precision_value) - else: - self.table_precision_user_wgt.setValue(self.max_precision_value) - - def table_refresh_rate_changed(self, new_idx): - - _notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx] - _notify_milliseconds = 0 if _notify_freq_hz == 0 else \ - 1000 / _notify_freq_hz - - self.notify_freq_hz = _notify_freq_hz - - if _notify_milliseconds == 0: - for pvgate in self.pv_gateway: - pvgate.notify_unison = False - pvgate.notify_milliseconds = _notify_milliseconds - pvgate.notify_freq_hz = self.notify_freq_hz - pvgate.monitor_stop() - time.sleep(0.01) - for pvgate in self.pv_gateway: - pvgate.monitor_start() - - else: - for pvgate in self.pv_gateway: - if not pvgate.notify_unison: - pvgate.monitor_stop() - - for pvgate in self.pv_gateway: - pvgate.notify_milliseconds = _notify_milliseconds - pvgate.notify_freq_hz = self.notify_freq_hz - - if not pvgate.notify_unison: - pvgate.notify_unison = True - pvgate.monitor_start() - else: - - self.cafe.updateMonitorPolicyDeltaMS( - pvgate.handle, pvgate.monitor_id, - pvgate.notify_milliseconds) - - if self.timer is not None: - self.timer.stop() - else: - self.timer = QTimer() - self.timer.timeout.connect(self.widget_update) - self.timer.singleShot(0, self.widget_update) - - if _notify_milliseconds > 0: - self.timer.start(_notify_milliseconds) - - def table_ts_resolution_changed(self, new_idx): - - for i, ts_res in enumerate(self.ts_combox_idx_dict.values()): - if i == new_idx: - self.format_ts_decimal_part = ts_res - break - - for pvgate in self.pv_gateway: - _pvd = self.cafe.getPVCache(pvgate.handle) - if _pvd.value[0] is not None: - if isinstance(_pvd.value[0], float): - pvgate.trigger_monitor_float.emit( - _pvd.value[0], _pvd.status, _pvd.alarmSeverity) - elif isinstance(_pvd.value[0], int): - pvgate.trigger_monitor_int.emit( - _pvd.value[0], _pvd.status, _pvd.alarmSeverity) - else: - pvgate.trigger_monitor_str.emit( - str(_pvd.value[0]), _pvd.status, _pvd.alarmSeverity) - - - def display_table_parameters(self): - display_wgt = QDialog(self) - display_wgt.setWindowTitle("PV Parameters") - layout = QVBoxLayout() - common_label_width = 120 - common_wgt_width = 160 - common_hbox_width = common_label_width + common_wgt_width + 20 - - self.initial_value = 0 - self.max_precision_value = 0 - for i, pvgate in enumerate(self.pv_gateway): - if pvgate.pv_ctrl is not None: - if pvgate.pv_ctrl.precision > 0: - self.max_precision_value = max(self.max_precision_value, - pvgate.pv_ctrl.precision) - self.initial_value = max(self.initial_value, - pvgate.precision) - - if self.max_precision_value > 0: - #precision user - _hbox_wgt = QWidget() - _hbox = QHBoxLayout() - precision_user_label = QLabel("Precision (user):") - self.table_precision_user_wgt = QSpinBox(self) - self.table_precision_user_wgt.setFocusPolicy(Qt.NoFocus) - self.table_precision_user_wgt.setValue(self.initial_value) - self.table_precision_user_wgt.setMaximum(self.max_precision_value) - self.table_precision_user_wgt.valueChanged.connect( - self.table_precision_user_changed) - precision_user_label.setAlignment(Qt.AlignLeft) - self.table_precision_user_wgt.setAlignment(Qt.AlignLeft) - _hbox.addWidget(precision_user_label) - _hbox.addWidget(self.table_precision_user_wgt) - _hbox.setAlignment(Qt.AlignLeft) - _hbox_wgt.setLayout(_hbox) - - precision_user_label.setFixedWidth(common_label_width) - self.table_precision_user_wgt.setFixedWidth(40) - _hbox_wgt.setFixedWidth(common_hbox_width) - - #precision ioc - _hbox2_wgt = QWidget() - _hbox2 = QHBoxLayout() - precision_ioc_label = QLabel("Precision (ioc): ") - precision_ioc = QPushButton(self) - precision_ioc.setText("Reset") - precision_ioc.clicked.connect(self.table_precision_ioc_reset) - precision_ioc_label.setAlignment(Qt.AlignLeft) - - _hbox2.addWidget(precision_ioc_label) - _hbox2.addWidget(precision_ioc) - _hbox2.setAlignment(Qt.AlignLeft) - - _hbox2_wgt.setLayout(_hbox2) - - precision_ioc_label.setFixedWidth(common_label_width) - precision_ioc.setFixedWidth(50) - - _hbox2_wgt.setFixedWidth(common_hbox_width) - - layout.addWidget(_hbox_wgt) - layout.addWidget(_hbox2_wgt) - - if 'Timestamp' in self.columns_dict.keys(): - #time-stamp - _hbox4_wgt = QWidget() - _hbox4 = QHBoxLayout() - ts_label = QLabel("Timestamp: ") - - self.ts_combox_idx_dict = { - 'second (s)': self.format_ts_sec, - 'decisecond (ds)': self.format_ts_deci, - 'millisecond (ms)': self.format_ts_milli, - 'microsecond (\u03bcs)': self.format_ts_micro, - 'nanosecond (ns)': self.format_ts_nano} - - ts_resolution = QComboBox(self) - for key, ts_res in self.ts_combox_idx_dict.items(): - ts_resolution.addItem(key) - - _current_idx = 0 - - for i, (key, ts_res) in enumerate(self.ts_combox_idx_dict.items()): - if ts_res == self.format_ts_decimal_part: - _current_idx = i - break - - ts_resolution.setCurrentIndex(_current_idx) - ts_resolution.currentIndexChanged.connect( - self.table_ts_resolution_changed) - - _hbox4.addWidget(ts_label) - _hbox4.addWidget(ts_resolution) - _hbox4_wgt.setLayout(_hbox4) - - ts_label.setFixedWidth(common_label_width) - ts_resolution.setFixedWidth(common_wgt_width) - _hbox4_wgt.setFixedWidth(common_hbox_width) - - layout.addWidget(_hbox4_wgt) - - #precision refresh rate - _hbox3_wgt = QWidget() - _hbox3 = QHBoxLayout() - refresh_freq_label = QLabel("Refresh rate: ") - #_default_refresh_val = 0 if self.notify_freq_hz <= 0 else \ - # self.notify_freq_hz - _default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \ - self.notify_freq_hz_default - - self.refresh_freq_combox_idx_dict = {0: 0, 1: 10, 2: 5, 3: 2, 4: 1, - 5: 0.5, 6: _default_refresh_val} - refresh_freq = QComboBox(self) - refresh_freq.addItem('direct') - refresh_freq.addItem('{0} Hz'.format( - self.refresh_freq_combox_idx_dict[1])) - refresh_freq.addItem('{0} Hz'.format( - self.refresh_freq_combox_idx_dict[2])) - refresh_freq.addItem('{0} Hz'.format( - self.refresh_freq_combox_idx_dict[3])) - refresh_freq.addItem('{0} Hz'.format( - self.refresh_freq_combox_idx_dict[4])) - refresh_freq.addItem('{0} Hz'.format( - self.refresh_freq_combox_idx_dict[5])) - - _default_text = 'default (direct)' if _default_refresh_val == 0 else \ - 'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6]) - - refresh_freq.addItem(_default_text) - - for key, value in self.refresh_freq_combox_idx_dict.items(): - if value == self.notify_freq_hz: - refresh_freq.setCurrentIndex(key) - break - - refresh_freq.currentIndexChanged.connect( - self.table_refresh_rate_changed) - - _hbox3.addWidget(refresh_freq_label) - _hbox3.addWidget(refresh_freq) - _hbox3_wgt.setLayout(_hbox3) - - refresh_freq_label.setFixedWidth(common_label_width) - refresh_freq.setFixedWidth(common_wgt_width) - _hbox3_wgt.setFixedWidth(common_hbox_width) - - layout.addWidget(_hbox3_wgt) - - layout.setAlignment(Qt.AlignLeft) - layout.setContentsMargins(10, 0, 0, 0) - layout.setSpacing(0) - - display_wgt.setMinimumWidth(340) - display_wgt.setLayout(layout) - - display_wgt.exec() - - - def mousePressEvent(self, event): - row = self.indexAt(event.pos()).row() - - if row > -1: - if row < len(self.pv_list): - self.pv_gateway[row].mousePressEvent(event) - else: - button = event.button() - if button == Qt.RightButton: - self.table_context_menu.exec(QCursor.pos()) - self.clearFocus() - - #remove highlighting which persists after mouse leaves - def mouseMoveEvent(self, event): - pass - - def leaveEvent(self, event): - self.clearSelection() - self.clearFocus() - del event - - -class QMessageWidget(QListWidget): - """Log message window.""" - def __init__(self, parent=None): - super(QMessageWidget, self).__init__(parent) - self.myItem = None - self.setSelectionMode(QAbstractItemView.ExtendedSelection) - self.setFocusPolicy(Qt.StrongFocus) - - def leaveEvent(self, event): - if self.myItem: - self.clearSelection() - self.clearFocus() - del event - - def mousePressEvent(self, event): - item = self.itemAt(event.x(), event.y()) - if item: - self.myItem = item - self.setCurrentItem(self.myItem) - - def keyPressEvent(self, event): - if event.matches(QKeySequence.Copy): - nitem = event.count() - if nitem: - if self.myItem is not None: - _str = self.myItem.text() - QApplication.clipboard().setText(_str) - - - -class QResultsWidget: - """Results table""" - def __init__(self, summary_dict=None, table_dict=None): - - self.summary_dict = summary_dict - self.table_dict = table_dict - self._group_box = None - - def group_box(self, title=""): - self._group_box = QGroupBox(title) - self._group_box.setObjectName("OUTERLEFT") - _vbox = QVBoxLayout() - _qspace = QFrame() - _qspace.setFixedHeight(10) - _vbox.addWidget(_qspace) - - _font = QFont("Sans Serif", 10) - - longest_str_item1 = "" - longest_str_item2 = "" - - for i, (label, text) in enumerate(self.summary_dict.items()): - if len(str(label)) > len(longest_str_item1): - longest_str_item1 = str(label) - if len(str(text)) > len(longest_str_item2): - longest_str_item2 = str(text) - - fm = QFontMetricsF(_font) - - _factor = 1.15 - - if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"): - _factor = 1.18 - - qrect1 = fm.boundingRect(longest_str_item1) - qrect2 = fm.boundingRect(longest_str_item2) - _width_scaling_factor = 1.5 - _width_scaling_factor_le = 1.15 - _widget_height = 25 - for i, (label, text) in enumerate(self.summary_dict.items()): - #print(label, text) - qlabel = QLabel(label) - qle = QLineEdit(text) - qlabel.setFont(_font) - qlabel.setStyleSheet(("QLabel{color:black;" + - "margin:0px; padding:2px;}")) - qlabel.setFixedWidth(qrect1.width() * _width_scaling_factor) - qlabel.setFixedHeight(_widget_height) - - qle.setFocusPolicy(Qt.NoFocus) - qle.setFont(_font) - qle.setStyleSheet(("QLineEdit{color:blue;" + - "background-color: lightgray;" + - "qproperty-readOnly: true;" + - "margin:0px; padding:2px;}")) - qle.setFixedWidth(qrect2.width() * _width_scaling_factor_le) - qle.setFixedHeight(_widget_height) - qle.setAlignment(Qt.AlignRight) - - _hbox_widget = QWidget() - _hbox = QHBoxLayout() - _hbox.addWidget(qlabel) - _hbox.addWidget(qle) - _hbox_widget.setLayout(_hbox) - _hbox.setAlignment(Qt.AlignCenter) - _hbox.setContentsMargins(0, 2, 0, 0) - _vbox.addWidget(_hbox_widget) - - _vbox.setContentsMargins(0, 0, 0, 0) - _vbox.setAlignment(Qt.AlignCenter|Qt.AlignTop) - - _vbox2_widget = QWidget() - _vbox2 = QVBoxLayout() - _vbox2.setContentsMargins(0, 20, 0, 40) - table = QTableWidget(len(self.table_dict)-1, 2) - table.verticalHeader().setVisible(False) - table.setFocusPolicy(Qt.NoFocus) - #table.setFont(_font) - - longest_str_item1 = "" - longest_str_item2 = "" - - for i, (label, text) in enumerate(self.table_dict.items()): - item1 = QTableWidgetItem(str(label)) - item2 = QTableWidgetItem(str(text)) - item1.setTextAlignment(Qt.AlignCenter) - item2.setTextAlignment(Qt.AlignCenter) - item1.setForeground(QColor("black")) - item2.setForeground(QColor("black")) - if i%2 == 0: - item1.setBackground(QColor("lightgray")) - item2.setBackground(QColor("lightgray")) - - if len(str(label)) > len(longest_str_item1): - longest_str_item1 = str(label) - if len(str(text)) > len(longest_str_item2): - longest_str_item2 = str(text) - - if i == 0: - #item1.setFont(_font) - #item2.setFont(_font) - table.setHorizontalHeaderItem(0, item1) - table.setHorizontalHeaderItem(1, item2) - else: - table.setItem(i-1, 0, item1) - table.setItem(i-1, 1, item2) - - fm = QFontMetricsF(_font) - - _factor = 1.2 - - if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"): - _factor = 1.18 - - qrect = fm.boundingRect(longest_str_item1 + longest_str_item2) - - _width_scaling_factor = 1.04 - table.resizeColumnsToContents() - table.resizeRowsToContents() - - table.setFixedHeight((fm.lineSpacing() * _factor * len( - self.table_dict)) + fm.lineSpacing()*2) - - table.setFixedWidth(((qrect.width()) * _width_scaling_factor)) - - _vbox2.addWidget(table) - _vbox2.setAlignment(Qt.AlignCenter|Qt.AlignTop) - _vbox2_widget.setLayout(_vbox2) - - _vbox.addWidget(_vbox2_widget) - - self._group_box.setLayout(_vbox) - self._group_box.setContentsMargins(20, 20, 20, 20) - self._group_box.setAlignment(Qt.AlignTop) - self._group_box.setFixedHeight( - table.height() + (_widget_height*len(self.summary_dict))) - self._group_box.setFixedWidth(table.width() + 20) - return self._group_box - - -class QResultsTableWidget(): - """Results table""" - def __init__(self, column_headings=None): - - self.column_headings = column_headings - self._group_box = None - - def group_box(self, title="Table of Results"): - self._group_box = QGroupBox(title) - self._group_box.setObjectName("OUTER") - - _font = QFont("Sans Serif", 10) - - _vbox2_widget = QWidget() - _vbox2 = QVBoxLayout() - _vbox2.setContentsMargins(0, 20, 0, 40) - table = QTableWidget(1, len(self.column_headings)) - table.verticalHeader().setVisible(True) - table.setFocusPolicy(Qt.NoFocus) - table.setFont(_font) - - for i, heading in enumerate(self.column_headings): - _item = QTableWidgetItem(str(heading)) - table.setHorizontalHeaderItem(i, _item) - - table.resizeColumnsToContents() - table.resizeRowsToContents() - table.setFixedHeight(400) - - _vbox2.addWidget(table) - _vbox2.setAlignment(Qt.AlignCenter|Qt.AlignTop) - _vbox2_widget.setLayout(_vbox2) - - self._group_box.setLayout(_vbox2) - self._group_box.setContentsMargins(20, 20, 20, 20) - self._group_box.setAlignment(Qt.AlignTop) - - self._group_box.setFixedWidth(table.width() + 20) - return self._group_box - - -class QHDFDockWidget(QDockWidget): - - def __init__(self, title=None, parent=None): - super().__init__(title, parent) - self.parent = parent - self.is_docked = True - self.geometry_from_qsettings = self.parent.application_geometry - self.topLevelChanged.connect(self._top_level_changed) - self.setVisible(False) - self.setFloating(False) - self.geometry_from_qsettings = self.parent.geometry() - - def closeEvent(self, event: QCloseEvent): - super().closeEvent(event) - - self.parent.setGeometry(self.geometry_from_qsettings) - self.setGeometry(self.geometry_from_qsettings) - QApplication.processEvents() - - self.parent.setGeometry(self.geometry_from_qsettings) - - def changeEvent(self, event): - pass - - def _top_level_changed(self, is_floating): - pass - - -class QNoDockWidget(QDockWidget): - - def __init__(self, title=None, parent=None): - super().__init__(title, parent) - self.parent = parent - self.is_docked = True - self.geometry_from_qsettings = self.parent.application_geometry - self.topLevelChanged.connect(self._top_level_changed) - self.setVisible(False) - self.setFloating(True) - - def changeEvent(self, event): - if "QAbstractButton" in str(self.sender()): - self.geometry_from_qsettings = self.parent.geometry() - - def _top_level_changed(self): #, is_floating): - self.setVisible(False) - self.setFloating(True) - #ResetGeometry - self.parent.setGeometry(self.geometry_from_qsettings) - QApplication.processEvents() - - - -class CAQStripChart(PlotWidget): - '''Channel access enabled pyqtgraph.PlotWidget''' - - def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'], - monitor_callback=None, pv_within_daq_group: bool = False, - color_mode=None, show_units: bool = False, prefix: str = "", - suffix: str = "", notify_freq_hz: int = 0, title: str = "", - ylabel: str = ""): - super().__init__() - - self.no_channels = len(pv_list) - - self.found = False - self.time_zero = [0] * self.no_channels - self.time_delta = [0] * self.no_channels - self.pv_list = pv_list - self.pv2item_dict = {} - self.pv_gateway = [None] * self.no_channels - - self.pvd_previous_list = [None] * self.no_channels - self.val_previous = [None] * self.no_channels - - self.curve = [None] * self.no_channels - - for i in range(0, len(self.pv_list)): - self.pv_gateway[i] = PVGateway( - parent, pv_list[i], monitor_callback, pv_within_daq_group, - color_mode, show_units, prefix, suffix, - #connect_callback=self.py_connect_callback, - connect_triggers=False, notify_freq_hz=notify_freq_hz, - monitor_dbr_time=True) - - self.pv_gateway[i].is_initialize_complete() - - self.pvd_previous_list[i] = self.pv_gateway[i].pvd - - self.pv_gateway[i].trigger_connect.connect( - self.receive_connect_update) - - self.pv_gateway[i].trigger_monitor_str.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor_int.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor_float.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor.connect( - self.receive_monitor_dbr_time) - - self.pv_gateway[i].widget_class = "PlotWidget" - self.pv2item_dict[self.pv_gateway[i]] = i - - self.cafe = self.pv_gateway[0].cafe - self.cyca = self.pv_gateway[0].cyca - for i in range(0, len(self.pv_gateway)): - if self.cafe.isConnected(self.pv_gateway[i].pv_name): - self.pv_gateway[i].trigger_connect.emit( - self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name), - self.pv_gateway[i].cyca.ICAFE_CS_CONN) - - for i in range(0, len(self.pv_gateway)): - if not self.pv_gateway[i].pv_within_daq_group: - self.pv_gateway[i].monitor_start() - - sampleinterval = 0.2 - ##timewindow = 1800.0 - - self.ts_delta_max = 0.6 - - # Data stuff - self._interval = int(sampleinterval*1000) - self._bufsize = 9000 #int(timewindow/0.33) - self._bufsize2 = 9000 # int(timewindow/1.33) - self.databuffer = [None] * self.no_channels - self.timebuffer = [None] * self.no_channels - self.x = [None] * self.no_channels - self.y = [None] * self.no_channels - self.x_shifted = [None] * self.no_channels - - self.idx = [0] * self.no_channels - - for i in range(0, self.no_channels): - bsize = self._bufsize if i == 0 else self._bufsize2 - self.databuffer[i] = collections.deque([None]*bsize, bsize) - self.timebuffer[i] = collections.deque([0]*bsize, bsize) - self.x[i] = np.zeros(bsize, dtype=np.float) - self.y[i] = np.zeros(bsize, dtype=np.float) - - ##_long_size=20 - #self.data_series_buffer = collections.deque([0]*_long_size, _long_size) - #self.time_series_buffer = collections.deque([0]*_long_size, _long_size) - - #self.data_series = [] * self.no_channels - #self.time_series = [] * self.no_channels - - self.iflag_series = 0 - - #self.x = np.linspace(-timewindow, 0.0, self._bufsize) - #self.x_series = np.zeros(_long_size, dtype=np.float) - #self.y_series = np.zeros(_long_size, dtype=np.float) - if title is not None: - self.setTitle(str(title)) #self.pv_gateway[0].pv_name) - self.showGrid(x=True, y=True) - self.setLabel('left', ylabel, self.pv_gateway[0].units) - self.setLabel('bottom', 'time', 's') - self.setBackground((60, 60, 60)) #247, 236, 249)) - self.setLimits(yMin=-0.11) - - self.plotItem.setMouseEnabled(y=False) # Only allow zoom in X-axis - self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis - - pen_list = [(125, 249, 255), (255, 255, 0)] - - for i in range(0, len(self.pv_gateway)): - self.curve[i] = self.plot(self.x[0], self.y[0], pen=pen_list[i]) - - l = pg.LegendItem(offset=(0., 0.5), colCount=1) - l.setParentItem(self.graphicsItem()) - - l.setLabelTextColor((255, 255, 255)) - - for curv, pv in zip(self.curve, self.pv_gateway): - l.addItem(curv, pv.pv_name) - - QApplication.processEvents() - - @Slot(object, int) - def receive_monitor_dbr_time(self, pvdata, alarm_severity): - - #Check on alarm_severity?? - - _row = self.pv2item_dict[self.sender()] - - ts_now = pvdata.ts[0] + pvdata.ts[1] * 10**(-9) - ts_previous = (self.pvd_previous_list[_row].ts[0] + - self.pvd_previous_list[_row].ts[1] * 10**(-9)) - ##ts_delta = ts_now - ts_previous - - if (pvdata.ts[0] == self.pvd_previous_list[_row].ts[0]) and ( - pvdata.ts[1] == self.pvd_previous_list[_row].ts[1]): - pvdata.show() - self.pvd_previous_list[_row].show() - return - - value = pvdata.value[0] - #discard first callbacks - #if ts_delta > 2.0: - # self.pvd_previous_list[_row] = _pvd - # return; - self.pvd_previous_list[_row] = pvdata - self.val_previous[_row] = value - #self.pvd_previous_list[_row].ts[0] = _pvd.ts[0] - #self.pvd_previous_list[_row].ts[1] = _pvd.ts[1] - - self.databuffer[_row].append(value) - self.timebuffer[_row].append(self.time_delta[_row]) - - highest_ts = self.timebuffer[0][0] \ - if self.timebuffer[0][0] is not None else 0 - for i in range(1, len(self.timebuffer)): - if self.timebuffer[i][0] is None: - continue - elif self.timebuffer[i][0] > highest_ts: - highest_ts = self.timebuffer[i][0] - - if self.timebuffer[_row][0] is not None: - for i, val in enumerate(self.timebuffer[_row]): - if val > highest_ts: - self.idx[_row] = i - 1 - break - - self.y[_row][:] = self.databuffer[_row] - self.x[_row][:] = self.timebuffer[_row] - - idx = self.idx[_row] - self.x_shifted[_row] = list( - map(lambda m: (m - self.time_delta[_row]), self.x[_row][idx:])) - - self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:]) - - self.time_delta[_row] = ( - pvdata.ts[0] + pvdata.ts[1]*10**(-9)) - self.time_zero[0] - - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - - #self.pv_gateway.receive_monitor_update(value, status, alarm_severity) - _row = self.pv2item_dict[self.sender()] - - #print("row, value===>", _row, value, self.pv_gateway[_row].pv_name) - _pvd = self.pv_gateway[_row].cafe.getPVCache( - self.pv_gateway[_row].handle) - - #print("value", _pvd.value[0], self.pvd_previous_list[_row].value[0]) - - _pvd2 = self.pv_gateway[_row].pvd - - print("val", value, _pvd2.value[0], _pvd.value[0], - self.pvd_previous_list[_row].value[0]) - - ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9) - - ts_previous = (self.pvd_previous_list[_row].ts[0] + - self.pvd_previous_list[_row].ts[1] * 10**(-9)) - ts_delta = ts_now - ts_previous - - if value == self.val_previous[_row]: - _pvd.show() - - return - - - #discard first callbacks - #if ts_delta > 2.0: - # self.pvd_previous_list[_row] = _pvd - # return; - self.pvd_previous_list[_row] = _pvd2 - self.val_previous[_row] = value - #self.pvd_previous_list[_row].ts[0] = _pvd.ts[0] - #self.pvd_previous_list[_row].ts[1] = _pvd.ts[1] - - self.databuffer[_row].append(value) - self.timebuffer[_row].append(self.time_delta[_row]) - - highest_ts = self.timebuffer[0][0] \ - if self.timebuffer[0][0] is not None else 0 - for i in range(1, len(self.timebuffer)): - if self.timebuffer[i][0] is None: - continue - elif self.timebuffer[i][0] > highest_ts: - highest_ts = self.timebuffer[i][0] - - - if self.timebuffer[_row][0] is not None: - for i, val in enumerate(self.timebuffer[_row]): - if val > highest_ts: - self.idx[_row] = i - 1 - break - - - #for i in range(1, self.timebuffer): - # if self.timebuffer[i][0] is not None: - # a = self.timebuffer[0][0] - # for i, val in enumerate(self.timebuffer[_row]): - # if val > a: - # idx = i - 1 - # break - - - self.y[_row][:] = self.databuffer[_row] - self.x[_row][:] = self.timebuffer[_row] - - - #self.y[_row][:] = self.databuffer[_row] - #self.x[_row][:] = self.timebuffer[_row] - - ''' - #print(ts_delta, value, self.pvd_previous.value[0]) - #if (ts_delta < self.ts_delta_max) and (value < - #self.pvd_previous.value[0]) : - if (value < self.pvd_previous.value[0]) : - self.data_series_buffer.append(value) - self.time_series_buffer.append(ts_now - self.time_zero ) - self.y_series[:] = self.data_series_buffer - self.x_series[:] = self.time_series_buffer - #print(self.x_series, self.y_series) - #elif ts_delta < 1.0: - if len(self.data_series_buffer) > 15: - #x_series = np.array(self.time_series, dtype=np.float) - #y_series = np.array(self.data_series, dtype=np.float) - _x=self.x_series.reshape((-1, 1)) - - model = LinearRegression() - model.fit(_x, self.y_series) - r_sq = model.score(_x, self.y_series) - ###JCprint('coefficient of determination:', - ##r_sq, "slope", model.coef_ , "lifetime:", - ###self.y_series[0]/model.coef_ / 3600) - #print('intercept:', model.intercept_) - #print('slope:', model.coef_) - #print('max value', y_series[0], y_series[1]) - if r_sq > 0.995: - _I = self.y_series[0] - ###JCprint("lifetime:", _I/model.coef_ / 3600) - - - y_pred = model.predict(_x) - #print("len, y_pred, _x", len(y_pred), - #len(self.y_series), len(_x)) - #print('predicted response:', y_pred, sep='\n') - m_sq_error = mean_squared_error(self.y_series, y_pred) - #print('Mean squared error: {0:.9f}'.format( - # mean_squared_error(y_series, y_pred))) - #print('Coefficient of determination: {0:.9f}'.format( - # r2_score(y_series, y_pred))) - - - - self.trigger_series_sequence.emit(self.x_series, - self.y_series) - #print("emit") - self.data_series = [] - self.time_series = [] - #print(len(self.x_series), len(self.y_series)) - else: - self.data_series = [] - self.time_series = [] - - ''' - - - #dt = (self.x[-1] - self.x[-2]) - #print("dt", dt) - #Lowet IPCT before trigger is set to t=0 - idx = self.idx[_row] - self.x_shifted[_row] = list( - map(lambda m: (m - self.time_delta[_row]), self.x[_row][idx:])) - - ##self.y = np.where(self.y != self.y, 0, self.y) #test for nan - - #print("row len len ", _row, self.time_delta[0], self.time_delta[1]) - - - self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:]) - - self.time_delta[_row] = ( - _pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero[0] - - - ''' - LOOK_BACK = -800 - if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name: - LOOK_BACK = -250 - - if value > self.y[-2]: - if not self.found: - #print(x_shifted[-240:], self.y[-240:]) - #self.y = np.where(self.y != self.y, 0, self.y) #test for nan - max_index = self.y[LOOK_BACK:].argmax() - - if max_index == 0: - return - print("max index=", max_index) - - #print(x_shifted[-600+max_index:], self.x[-600+max_index:]) - #print(self.y[-600+max_index:-2]) - self.found = True - #print("Are Signals blocked??", self.signalsBlocked()) - self.trigger_decay_sequence.emit(np.array( - x_shifted[LOOK_BACK+max_index+9:-2]), - self.y[LOOK_BACK+max_index+9:-2]) - else: - self.found = False - ''' - - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - print("pv_name==>", pv_name) - - _row = self.pv2item_dict[self.sender()] - self.pv_gateway[_row].receive_connect_update(handle, pv_name, status, - post_display=False) - - #self.pv_gateway.receive_connect_update(handle, pv_name, status) - _pvd = self.pv_gateway[_row].cafe.getPVCache( - self.pv_gateway[_row].handle) - if self.time_zero[_row] == 0: - self.time_zero[_row] = _pvd.ts[0] + _pvd.ts[1]*10**(-9) - - self.pvd_previous = _pvd - - - #renove highlighting which persists after mouse leaves - def mouseMoveEvent(self, event): - pass - - def leaveEvent(self, event): - self.clearFocus() - del event - - -class CAQPCTChart(PlotWidget): - '''Channel access enabled pyqtgraph.PlotWidget''' - #trigger_monitor_float = Signal(float, int, int) - #trigger_monitor_int = Signal(int, int, int) - #trigger_monitor_str = Signal(str, int, int) - - #trigger_connect = Signal(int, str, int) - - trigger_decay_sequence = Signal(np.ndarray, np.ndarray) - trigger_series_sequence = Signal(np.ndarray, np.ndarray) - #def py_connect_callback(self, handle, pvname, status): - # self.trigger_connect.emit(int(handle), str(pvname), int(status)) - # print("py connect callback", handle, pvname, status) - - def daq_start(self): - self.blockSignals(False) - - def daq_pause(self): - self.blockSignals(True) - - def daq_stop(self): - self.blockSignals(True) - - def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'], - monitor_callback=None, pv_within_daq_group: bool = False, - color_mode=None, show_units: bool = False, prefix: str = "", - suffix: str = "", notify_freq_hz: int = 0): - super().__init__() - - self.found = False - self.time_zero = 0 - self.time_delta = 0 - self.pv_list = pv_list - self.pv2item_dict = {} - self.pv_gateway = [None] * len(self.pv_list) - self.pvd_previous = None - - for i in range(0, len(self.pv_list)): - self.pv_gateway[i] = PVGateway( - parent, pv_list[i], monitor_callback, pv_within_daq_group, - color_mode, show_units, prefix, suffix, - #connect_callback=self.py_connect_callback, - connect_triggers=False, notify_freq_hz=notify_freq_hz) - - self.pv_gateway[i].is_initialize_complete() - - self.pv_gateway[i].trigger_connect.connect( - self.receive_connect_update) - - self.pv_gateway[i].trigger_monitor_str.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor_int.connect( - self.receive_monitor_update) - self.pv_gateway[i].trigger_monitor_float.connect( - self.receive_monitor_update) - - self.pv_gateway[i].widget_class = "PlotWidget" - - self.pv2item_dict[self.pv_gateway[i]] = i - - self.cafe = self.pv_gateway[0].cafe - self.cyca = self.pv_gateway[0].cyca - for i in range(0, len(self.pv_gateway)): - if self.cafe.isConnected(self.pv_gateway[i].pv_name): - self.pv_gateway[i].trigger_connect.emit( - self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name), - self.pv_gateway[i].cyca.ICAFE_CS_CONN) - - for i in range(0, len(self.pv_gateway)): - if not self.pv_gateway[i].pv_within_daq_group: - self.pv_gateway[i].monitor_start() - - sampleinterval = 0.333 - timewindow = 1800.0 - - self.ts_delta_max = 0.6 - - # Data stuff - self._interval = int(sampleinterval*1000) - self._bufsize = int(timewindow/sampleinterval) - self.databuffer = collections.deque([None]*self._bufsize, self._bufsize) - self.timebuffer = collections.deque([0]*self._bufsize, self._bufsize) - - _long_size = 20 - self.data_series_buffer = collections.deque([0]*_long_size, _long_size) - self.time_series_buffer = collections.deque([0]*_long_size, _long_size) - - self.data_series = [] - self.time_series = [] - - self.iflag_series = 0 - - #self.x = np.linspace(-timewindow, 0.0, self._bufsize) - self.x = np.zeros(self._bufsize, dtype=np.float) - self.y = np.zeros(self._bufsize, dtype=np.float) - - self.x_series = np.zeros(_long_size, dtype=np.float) - self.y_series = np.zeros(_long_size, dtype=np.float) - - self.setTitle("PCT(t)") #self.pv_gateway[0].pv_name) - self.showGrid(x=True, y=True) - self.setLabel('left', 'I', 'mA') - self.setLabel('bottom', 'time', 's') - self.setBackground((60, 60, 60)) #247, 236, 249)) - self.setLimits(yMin=-0.11) - - self.plotItem.setMouseEnabled(y=False) # Only allow zoom in X-axis - self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis - - self.curve = self.plot(self.x, self.y, pen=(125, 249, 255)) - #self.curve2 = self.plot(self.x, self.y, pen=(255,255,0)) - - l = pg.LegendItem(offset=(0., 0.5)) - l.setParentItem(self.graphicsItem()) - l.setLabelTextColor((125, 249, 255)) - - l.addItem(self.curve, str(self.pv_gateway[0].pv_name)) - ''' - l2=self.addLegend() - l2.setLabelTextColor('g') - l2.setOffset(10) - l2.addItem(self.curve2, str(self.pv_gateway[0].pv_name)) - ''' - self.daq_stop() - print(self._bufsize) - print(len(self.x), len(self.y)) - - - - @Slot(str, int, int) - @Slot(int, int, int) - @Slot(float, int, int) - def receive_monitor_update(self, value, status, alarm_severity): - - #self.pv_gateway.receive_monitor_update(value, status, alarm_severity) - _row = self.pv2item_dict[self.sender()] - #print("value===>", value, self.pv_gateway[_row].pv_name) - _pvd = self.pv_gateway[_row].cafe.getPVCache( - self.pv_gateway[_row].handle) - - ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9) - ts_previous = self.pvd_previous.ts[0] + self.pvd_previous.ts[1]*10**(-9) - ts_delta = ts_now - ts_previous - - if (_pvd.ts[0] == self.pvd_previous.ts[0]) and ( - _pvd.ts[1] == self.pvd_previous.ts[1]): - #_pvd.show() - return - - #discard first callbacks - if ts_delta > 2.0: - self.pvd_previous = _pvd - return - - self.databuffer.append(value) - self.y[:] = self.databuffer - self.timebuffer.append(self.time_delta) - self.x[:] = self.timebuffer - - #print(ts_delta, value, self.pvd_previous.value[0]) - #if (ts_delta < self.ts_delta_max) and (value < - # self.pvd_previous.value[0]): - if value < self.pvd_previous.value[0]: - self.data_series_buffer.append(value) - self.time_series_buffer.append(ts_now - self.time_zero) - self.y_series[:] = self.data_series_buffer - self.x_series[:] = self.time_series_buffer - #print(self.x_series, self.y_series) - #elif ts_delta < 1.0: - if len(self.data_series_buffer) > 15: - #x_series = np.array(self.time_series, dtype=np.float) - #y_series = np.array(self.data_series, dtype=np.float) - _x = self.x_series.reshape((-1, 1)) - - model = LinearRegression() - model.fit(_x, self.y_series) - r_sq = model.score(_x, self.y_series) - ###JCprint('coefficient of determination:', - ###r_sq, "slope", model.coef_ , "lifetime:", - ###self.y_series[0]/model.coef_ / 3600) - #print('intercept:', model.intercept_) - #print('slope:', model.coef_) - #print('max value', y_series[0], y_series[1]) - if r_sq > 0.995: - #_I = self.y_series[0] - - - ###JCprint("lifetime:", _I/model.coef_ / 3600) - - ####y_pred = model.predict(_x) - #print("len, y_pred, _x", len(y_pred), len(self.y_series), - # len(_x)) - #print('predicted response:', y_pred, sep='\n') - ##m_sq_error = mean_squared_error(self.y_series, y_pred) - #print('Mean squared error: {0:.9f}'.format( - # mean_squared_error(y_series, y_pred))) - #print('Coefficient of determination: {0:.9f}'.format( - # r2_score(y_series, y_pred))) - - - self.trigger_series_sequence.emit(self.x_series, - self.y_series) - #print("emit") - self.data_series = [] - self.time_series = [] - #print(len(self.x_series), len(self.y_series)) - else: - self.data_series = [] - self.time_series = [] - - - self.pvd_previous = _pvd - - #dt = (self.x[-1] - self.x[-2]) - #print("dt", dt) - #Lowet IPCT before trigger is set to t=0 - x_shifted = list(map(lambda m: (m - self.time_delta), self.x)) - - ##self.y = np.where(self.y != self.y, 0, self.y) #test for nan - self.curve.setData(x_shifted, self.y) - - self.time_delta = ( - _pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero - #x_shifted2= list(map(lambda m : m -self.time_delta-1 , self.x)) - #self.curve2.setData(x_shifted2, self.y) - #QApplication.processEvents() - #print(type(x_shifted), type(self.y), type([1.1]), type(1.1)) - - LOOK_BACK = -800 - if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name: - LOOK_BACK = -250 - - if value > self.y[-2]: - if not self.found: - #print(x_shifted[-240:], self.y[-240:]) - #self.y = np.where(self.y != self.y, 0, self.y) #test for nan - max_index = self.y[LOOK_BACK:].argmax() - - if max_index == 0: - return - print("max index=", max_index) - - #print(x_shifted[-600+max_index:], self.x[-600+max_index:]) - #print(self.y[-600+max_index:-2]) - self.found = True - #print("Are Signals blocked??", self.signalsBlocked()) - self.trigger_decay_sequence.emit( - np.array(x_shifted[LOOK_BACK+max_index+9:-2]), - self.y[LOOK_BACK+max_index+9:-2]) - else: - self.found = False - - - - - @Slot(int, str, int) - def receive_connect_update(self, handle: int, pv_name: str, status: int): - '''Triggered by connect signal''' - print("pv_name==>", pv_name) - - _row = self.pv2item_dict[self.sender()] - self.pv_gateway[_row].receive_connect_update(handle, pv_name, status, - post_display=False) - - #self.pv_gateway.receive_connect_update(handle, pv_name, status) - _pvd = self.pv_gateway[_row].cafe.getPVCache( - self.pv_gateway[_row].handle) - if self.time_zero == 0: - self.time_zero = _pvd.ts[0] + _pvd.ts[1]*10**(-9) - #print(self.time_zero) - self.pvd_previous = _pvd - - - #renove highlighting which persists after mouse leaves - def mouseMoveEvent(self, event): - pass - - def leaveEvent(self, event): - self.clearFocus() - del event