''' Module with channel access enabled QtWidgets.''' __author__ = 'Jan T. M. Chrin' import collections import numpy as np import re from threading import Lock import time from sklearn.linear_model import LinearRegression from distutils.version import LooseVersion from functools import reduce as func_reduce from qtpy.QtCore import ( QEventLoop, QMutex, QPoint, Qt, QThread, QTimer, Signal, Slot) from qtpy.QtGui import ( QCloseEvent, QColor, QCursor, QFont, QFontMetricsF, QIcon, QKeySequence) from qtpy.QtCore import __version__ as QT_VERSION_STR from qtpy.QtWidgets import ( QAbstractItemView, QAbstractSpinBox, QAction, QApplication, QBoxLayout, QCheckBox, QComboBox, QDialog, QDockWidget, QDoubleSpinBox, QFrame, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QListWidget, QMenu, QMessageBox, QPushButton, QSpinBox, QStyle, QStyleOptionSpinBox, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget) import pyqtgraph as pg from pyqtgraph import PlotWidget from caqtwidgets.pvgateway import PVGateway class QTaggedLineEdit(QWidget): def __init__(self, label_text=str(""), value="", position="LEFT", parent=None): super(QTaggedLineEdit, self).__init__(parent) self.parameter = str(value) self.label = QLabel(label_text) self.label.setObjectName("Tagged") self.label.setFixedHeight(24) self.label.setContentsMargins(10, 0, 0, 0) #self.label.setFixedWidth(80) self.line_edit = QLineEdit(self.parameter) self.line_edit.setObjectName("Write") self.line_edit.setFixedHeight(24) font = QFont("sans serif", 16) fm = QFontMetricsF(font) self.line_edit.setMaximumWidth(int(fm.width(self.parameter)+20)) self.label.setBuddy(self.line_edit) layout = QBoxLayout( QBoxLayout.LeftToRight if position == "LEFT" else \ QBoxLayout.TopToBottom) layout.addWidget(self.label) layout.addWidget(self.line_edit) layout.addStretch() layout.setSpacing(2) layout.setContentsMargins(0, 0, 0, 0) self.setLayout(layout) class QHLine(QFrame): def __init__(self): super(QHLine, self).__init__() self.setFrameShape(QFrame.HLine) self.setFrameShadow(QFrame.Sunken) class QVLine(QFrame): def __init__(self): super(QVLine, self).__init__() self.setFrameShape(QFrame.VLine) self.setFrameShadow(QFrame.Sunken) class AppQLineEdit(QLineEdit): def __init__(self, parent=None): #super().__init__(parent) pass def leaveEvent(self, event): self.clearFocus() del event class CAQLineEdit(QLineEdit, PVGateway): '''Channel access enabled QLineEdit widget''' trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) trigger_daq = Signal(object, str, int) trigger_daq_int = Signal(object, str, int) trigger_daq_str = Signal(object, str, int) def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units: bool = False, prefix: str = "", suffix: str = "", notify_freq_hz: int = 0, precision: int = 0): super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, connect_callback=self.py_connect_callback, notify_freq_hz=notify_freq_hz, precision=precision) self.is_initialize_complete() self.configure_widget() if not self.pv_within_daq_group: self.monitor_start() def py_connect_callback(self, handle, pvname, status): '''Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) @Slot(object, str, int) def receive_daq_update(self, daq_pvd, daq_mode, daq_state): PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): PVGateway.receive_monitor_update(self, value, status, alarm_severity) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) def configure_widget(self): self.setFocusPolicy(Qt.NoFocus) fm = QFontMetricsF(QFont("Sans Serif", 10)) qrect = fm.boundingRect(self.suggested_text) _width_scaling_factor = 1.15 self.setFixedHeight(int(fm.lineSpacing()*1.8)) self.setFixedWidth(int(qrect.width() * _width_scaling_factor)) if self.pv_within_daq_group: self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA) else: self.qt_property_initial_values(qt_object_name=self.PV_READBACK) #renove highlighting which persists after mouse leaves def mouseMoveEvent(self, event): #event.ignore() pass def leaveEvent(self, event): self.clearFocus() del event class CAQLabel(QLabel, PVGateway): '''Channel access enabled QLabel widget''' trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) trigger_daq = Signal(object, str, int) trigger_daq_int = Signal(object, str, int) trigger_daq_str = Signal(object, str, int) def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units: bool = False, prefix: str = "", suffix: str = "", notify_freq_hz: int = 0, precision: int = 0, scale_factor: float = 1): super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, connect_callback=self.py_connect_callback, notify_freq_hz=notify_freq_hz, precision=precision) self.scale_factor = scale_factor self.is_initialize_complete() self.configure_widget() if self.pv_within_daq_group is False: self.monitor_start() def py_connect_callback(self, handle, pvname, status): '''Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) @Slot(object, str, int) def receive_daq_update(self, daq_pvd, daq_mode, daq_state): if self.scale_factor != 1: daq_pvd.value[0] = daq_pvd.value[0] * self.scale_factor PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): if self.scale_factor != 1: value = value * self.scale_factor PVGateway.receive_monitor_update(self, value, status, alarm_severity) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) def configure_widget(self): self.setFocusPolicy(Qt.NoFocus) fm = QFontMetricsF(QFont("Sans Serif", 10)) qrect = fm.boundingRect(self.suggested_text) _width_scaling_factor = 1.15 self.setFixedHeight(int(fm.lineSpacing()*1.8)) self.setFixedWidth(int(qrect.width() * _width_scaling_factor)) if self.pv_within_daq_group: self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA) else: self.qt_property_initial_values(qt_object_name=self.PV_READBACK) #For use with CAQMenu class QLineEditExtended(QLineEdit): def __init__(self, parent=None): super().__init__(parent) self.parent = parent def mousePressEvent(self, event): button = event.button() if button == Qt.RightButton: self.parent.showContextMenu() elif button == Qt.LeftButton: self.parent.mousePressEvent(event) class CAQMenu(QComboBox, PVGateway): '''Channel access enabled QMenu widget''' trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units=False, prefix: str = "", suffix: str = ""): super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, connect_callback=self.py_connect_callback) self.is_initialize_complete() self.configure_widget() #After configure:widget self.currentIndexChanged.connect(self.value_change) if self.pv_within_daq_group is False: self.monitor_start() def py_connect_callback(self, handle, pvname, status): '''Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) def configure_widget(self): self.previousIndex = None self.setFocusPolicy(Qt.NoFocus) self.setEditable(True) self.setLineEdit(QLineEditExtended(self)) self.lineEdit().setReadOnly(True) self.lineEdit().setAlignment(Qt.AlignCenter) enumStringList = self.cafe.getEnumStrings(self.handle) self.addItems(enumStringList) for i in range(0, self.count()): self.setItemData(i, Qt.AlignCenter, Qt.TextAlignmentRole) fm = QFontMetricsF(QFont("Sans Serif", 10)) qrect = fm.boundingRect(self.suggested_text) _width_scaling_factor = 1.1 self.setFixedHeight(round(fm.lineSpacing()*1.8)) self.setFixedWidth(round((qrect.width()+40) * _width_scaling_factor)) self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) def post_display_value(self, value): '''Convert value to index''' if "setCurrentIndex" in dir(self): if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) if isinstance(value, str): self.setCurrentIndex(self.cafe.getEnumFromString(self.handle, value)) elif isinstance(value, int): self.setCurrentIndex(value) #Should not happen elif isinstance(value, float): self.setCurrentIndex(int(value)) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(False) #self.previousIndex = self.currentIndex() return else: print(("ERROR: overloaded post_display_value: 'setCurrentIndex' " "method does not exist!")) def value_change(self, indx): status = self.cafe.set(self.handle, indx) if status != self.cyca.ICAFE_NORMAL: #self.showSetErrorMsg(status) value = self.cafe.getCache(self.handle, 'int') if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) if value is not None: self.setCurrentIndex(value) else: if self.previousIndex is not None: self.setCurrentIndex(self.previousIndex) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(False) self.pv_message_in_a_box.setText( "CAQMenu set operation reports error:\n{0}".format( self.cafe.getStatusCodeAsString(status))) self.pv_message_in_a_box.exec() def mousePressEvent(self, event): button = event.button() if button == Qt.RightButton: PVGateway.mousePressEvent(self, event) elif self.pv_info is not None: if self.pv_info.accessWrite == 0: event.ignore() return QComboBox.mousePressEvent(self, event) self.previousIndex = self.currentIndex() def enterEvent(self, event): if self.pv_info is not None: if self.pv_info.accessWrite == 0: for i in range(0, self.count()): self.setItemIcon(i, QIcon(":/forbidden.png")) self.setStyleSheet( ("QComboBox {background: transparent}" + "QComboBox::drop-down {image: url(:/forbidden.png)}")) def leaveEvent(self, event): if self.pv_info is not None: if self.pv_info.accessWrite == 0: for i in range(0, self.count()): self.setItemIcon(i, QIcon()) self.setStyleSheet( "QComboBox::drop-down {background: transparent}") #The widget should not gain focus by using the mouse wheel. #This is accomplished by setting the focus policy to Qt.StrongFocus. #The widget should only accept wheel events if it already has the #focus. This is accomplished by reimplementing QWidget.wheelEvent #within a QSpinBox subclass: def wheelEvent(self, event): if self.hasFocus() is False: event.ignore() else: QComboBox.wheelEvent(self, event) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): '''Triggered by monitor signal''' PVGateway.receive_monitor_update(self, value, status, alarm_severity) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) class CAQMessageButton(QPushButton, PVGateway): trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, notify_freq_hz: int = 0, pv_within_daq_group: bool = False, color_mode=None, show_units=False, msg_label: str = "", msg_press_value=None, msg_release_value=None, start_monitor=False): super().__init__(parent=parent, pv_name=pv_name, monitor_callback=monitor_callback, notify_freq_hz=notify_freq_hz, pv_within_daq_group=pv_within_daq_group, color_mode=color_mode, show_units=show_units, msg_label=msg_label, connect_callback=self.py_connect_callback) self.msg_press_value = msg_press_value self.msg_release_value = msg_release_value if self.msg_press_value is not None: self.pressed.connect(self.act_on_pressed) if self.msg_release_value is not None: self.released.connect(self.act_on_released) self.msg_label = msg_label self.suggested_text = self.msg_label _suggested_text_length = len(self.suggested_text)+3 self.suggested_text = self.suggested_text.rjust(_suggested_text_length, "^") self.configure_widget() self.msg_press_status = self.cyca.ICAFE_NORMAL self.msg_release_status = self.cyca.ICAFE_NORMAL self.msg_report_status = "PV={0}\n".format(self.pv_name) self.msg_has_error = False if not self.pv_within_daq_group and start_monitor: self.monitor_start() def py_connect_callback(self, handle, pvname, status): '''Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): PVGateway.receive_monitor_update(self, value, status, alarm_severity) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) def configure_widget(self): self.setFocusPolicy(Qt.StrongFocus) self.setCheckable(True) #Recognizes press and release states fm = QFontMetricsF(QFont("Sans Serif", 10)) qrect = fm.boundingRect(self.suggested_text) _width_scaling_factor = 1.0 self.setText(self.msg_label) self.setFixedHeight(round(fm.lineSpacing()*1.8)) self.setFixedWidth(round(qrect.width() * _width_scaling_factor)) self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) def enterEvent(self, event): if self.pv_info is not None: if self.pv_info.accessWrite == 0: self.setProperty("readOnly", True) self.qt_style_polish() def leaveEvent(self, event): if self.property("readOnly"): self.setProperty(self.qt_dynamic_property_get(), True) self.qt_style_polish() def mouseReleaseEvent(self, event): if self.msg_release_value is not None: time.sleep(0.1) QPushButton.mouseReleaseEvent(self, event) def mousePressEvent(self, event): if self.pv_info is not None: if self.pv_info.accessWrite == 1: QPushButton.mousePressEvent(self, event) if event.button() == Qt.RightButton: PVGateway.mousePressEvent(self, event) def act_on_pressed(self): if self.msg_press_value is not None: self.msg_press_status = self.cafe.set(self.handle, self.msg_press_value) if self.msg_press_status != self.cyca.ICAFE_NORMAL: self.msg_report_status += ( "Error in set operation (at press button):\n{0}\n".format( self.cafe.getStatusCodeAsString(self.msg_press_status))) self.msg_has_error = True qm = QMessageBox() qm.setText(self.msg_report_status) qm.exec() QApplication.processEvents() def act_on_released(self): if self.msg_release_value is not None: self.msg_release_status = self.cafe.set(self.handle, self.msg_release_value) if self.msg_release_status != self.cyca.ICAFE_NORMAL: self.msg_report_status += ( "Error in set operation (at release button):\n{0}\n".format( self.cafe.getStatusCodeAsString(self.msg_release_status))) self.msg_has_error = True if self.msg_has_error: self.msg_has_error = False self.pv_message_in_a_box.setText(self.msg_report_status) self.pv_message_in_a_box.exec() self.msg_report_status = "PV={0}\n".format(self.pv_name) qm = QMessageBox() qm.setText(self.msg_report_status) qm.exec() QApplication.processEvents() class CAQTextEntry(QLineEdit, PVGateway): '''Channel access enabled QTextEntry widget''' trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) def __init__(self, parent=None, pv_name: str="", monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units=False, prefix: str="", suffix: str="", precision: int=0): super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, connect_callback=self.py_connect_callback, precision=precision) self.is_initialize_complete() #waits a fraction of a second self.currentText = "" self.returnPressed.connect(self.valuechange) self.configure_widget() if self.pv_within_daq_group is False: self.monitor_start() def py_connect_callback(self, handle, pvname, status): '''Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): #print ("FONT-1", self.font().pixelSize(), value) #print ("FONT-1", self.font().pointSize(), value) #self.setFont(QFont("Sans Serif", 8)) #QLineEdit.setFont(self, QFont("Sans Serif", 8)) #self.font().setPixelSize(12) #QLineEdit.font().setPixelSize(8) PVGateway.receive_monitor_update(self, value, status, alarm_severity) #self.font().setPixelSize(12) #print ("FONT-3", self.font().pixelSize(), value) #print ("FONT-3", self.font().pointSize(), value) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) def configure_widget(self): self.setFocusPolicy(Qt.StrongFocus) #self.setFont(QFont("Sans Serif", 12)) #QLineEdit.setFont(self, QFont("Sans Serif", 8)) #f = QFont("Sans Serif") #f.setPixelSize(11) #self.setFont(f) #fm = QFontMetricsF(f) #As for CAQLabel QLineEdit.setFont(self, QFont("Sans Serif", 12)) fm = QFontMetricsF(QFont("Sans Serif", 9)) qrect = fm.boundingRect(self.suggested_text) _width_scaling_factor = 1.2 self.setFixedHeight(round(fm.lineSpacing()*1.8)) self.setFixedWidth(round(qrect.width()+20 * _width_scaling_factor)) self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) def valuechange(self): status = self.cafe.set(self.handle, self.text()) if status != self.cyca.ICAFE_NORMAL: if self.cafe.getNoMonitors(self.handle) == 0: val = self.cafe.get(self.handle, 'native') else: val = self.cafe.getCache(self.handle, 'native') if val is not None: if isinstance(val, str): strText = val else: valStr = ("{: .%sf}" % self.precision) strText = valStr.format(round(val, self.precision)) #print(strText, " precision ", self.precision) self.setText(strText) else: #Do this for TextInfo cache if self.cafe.getNoMonitors(self.handle) == 0: val = self.cafe.get(self.handle, 'native') def setText(self, value): QLineEdit.setText(self, value) self.currentText = self.text() #print ("FONT-2", self.font().pixelSize(), value) #print ("FONT-2", self.font().pointSize(), value) def enterEvent(self, event): if self.pv_info is not None: if self.pv_info.accessWrite == 0: self.setProperty("readOnly", True) self.qt_style_polish() self.setReadOnly(True) self.setFocusPolicy(Qt.StrongFocus) def leaveEvent(self, event): if self.isReadOnly(): self.setReadOnly(False) self.setProperty(self.qt_dynamic_property_get(), True) self.qt_style_polish() if self.text() != self.currentText: QLineEdit.setText(self, self.currentText) self.setCursorPosition(100) self.clearFocus() self.setFocusPolicy(Qt.NoFocus) del event def mousePressEvent(self, event): if event.button() == Qt.RightButton: PVGateway.mousePressEvent(self, event) self.clearFocus() return local_event_position = QPoint(event.x(), event.y()) local_cursor_position = self.cursorPositionAt(local_event_position) self.setCursorPosition(local_cursor_position) class CAQSpinBox(QSpinBox, PVGateway): '''Channel access enabled QSpinBox widget''' trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units=False, prefix: str = "", suffix: str = ""): super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, connect_callback=self.py_connect_callback) self.is_initialize_complete() self.valueChanged.connect(self.value_change) self.configure_widget() if not self.pv_within_daq_group: self.monitor_start() def py_connect_callback(self, handle, pvname, status): ''' Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): PVGateway.receive_monitor_update(self, value, status, alarm_severity) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) def configure_widget(self): self.previousValue = None self.currentValue = None self.setFocusPolicy(Qt.StrongFocus) self.setButtonSymbols(QAbstractSpinBox.UpDownArrows) self.setAccelerated(False) self.setLineEdit(QLineEditExtended(self)) self.lineEdit().setEnabled(True) self.lineEdit().setReadOnly(False) self.lineEdit().setAlignment(Qt.AlignLeft) self.lineEdit().setFont(QFont("Sans Serif", 16)) fm = QFontMetricsF(QFont("Sans Serif", 12)) _suggested_text = self.max_control_abs_str _added_text = "" if self.show_units: _added_text += " " + self.units _suggested_text += self.units if self.suffix: _added_text += " " + self.suffix _suggested_text += self.suffix self.setSuffix(_added_text) qrect = fm.boundingRect(_suggested_text) _width_scaling_factor = 1.0 self.setFixedHeight(int(fm.lineSpacing()*1.8)) self.setFixedWidth(int(qrect.width() * _width_scaling_factor)) self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) if self.pv_ctrl is not None: self.setRange(int(self.pv_ctrl.lowerControlLimit), int(self.pv_ctrl.upperControlLimit)) def post_display_value(self, value): '''Convert value to index''' if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) self.setValue(int(round(value))) self.blockSignals(False) else: self.setValue(int(round(value))) def mousePressEvent(self, event): _opt = QStyleOptionSpinBox() self.initStyleOption(_opt) _rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt, QStyle.SC_SpinBoxUp, self) _rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt, QStyle.SC_SpinBoxDown, self) self.previousValue = self.value() if event.button() == Qt.LeftButton: if _rect_up.contains(event.pos(), proper=True) or \ _rect_down.contains(event.pos(), proper=True): if not self.cafe.isConnected(self.handle): self.pv_message_in_a_box.setText( ("Spinbox change value events currently suspended\n" + "as channel {0} is disconnected.").format( self.pv_name)) self.pv_message_in_a_box.exec() return QSpinBox.mousePressEvent(self, event) #Clear Focus: only one step per mouse click. self.clearFocus() local_event_position = QPoint(event.x(), event.y()) local_cursor_position = self.lineEdit().cursorPositionAt( local_event_position) self.lineEdit().setCursorPosition(local_cursor_position) PVGateway.mousePressEvent(self, event) def setValue(self, intVal): QSpinBox.setValue(self, intVal) self.currentValue = self.value() def value_change(self, intVal): status = self.cafe.set(self.handle, intVal) if status != self.cyca.ICAFE_NORMAL: if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) if self.previousValue is not None: self.setValue(self.previousValue) else: _value = self.cafe.getCache(self.handle, 'int') if _value is not None: self.setValue(_value) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(False) self.pv_message_in_a_box.setText( ("Spinbox set operation reports error:\n{0}" .format(self.cafe.getStatusCodeAsString(status)))) self.pv_message_in_a_box.exec() else: if self.previousValue is not None: self.setValue(self.previousValue) else: _value = self.cafe.getCache(self.handle, 'int') if _value is not None: self.setValue(_value) self.parent.statusbar.showMessage( (self.widget_class + " " + self.cafe.getStatusCodeAsString(status))) def enterEvent(self, event): if self.pv_info is not None: if self.pv_info.accessWrite == 0: self.setProperty("readOnly", True) self.qt_style_polish() self.setReadOnly(True) self.setFocusPolicy(Qt.StrongFocus) def leaveEvent(self, event): if self.isReadOnly(): self.setReadOnly(False) self.setProperty(self.qt_dynamic_property_get(), True) self.qt_style_polish() self.clearFocus() self.setFocusPolicy(Qt.NoFocus) del event def keyPressEvent(self, event): if event.key() in (Qt.Key_Return, Qt.Key_Enter): QSpinBox.keyPressEvent(self, event) self.clearFocus() elif event.key() in (Qt.Key_Up, Qt.Key_Down): QSpinBox.keyPressEvent(self, event) else: if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) QSpinBox.keyPressEvent(self, event) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(False) # The spin box should not gain focus by using the mouse wheel. # This is accomplished by setting the focus policy to Qt.StrongFocus. # The spin box should only accept wheel events if it already has the focus. # This is accomplished by reimplementing QWidget.wheelEvent within a # QSpinBox subclass: def wheelEvent(self, event): #print("wheelEvent", self.hasFocus()) if self.hasFocus() is False: event.ignore() else: QSpinBox.wheelEvent(self, event) class CAQDoubleSpinBox(QDoubleSpinBox, PVGateway): '''Channel access enabled QDoubleSpinBox widget''' trigger_monitor_float = Signal(float, int, int) trigger_monitor_int = Signal(int, int, int) trigger_monitor_str = Signal(str, int, int) trigger_monitor = Signal(object, int) trigger_connect = Signal(int, str, int) def __init__(self, parent=None, pv_name: str = "", monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units: bool = False, prefix: str = "", suffix: str = ""): super().__init__(parent=parent, pv_name=pv_name, monitor_callback=monitor_callback, pv_within_daq_group=pv_within_daq_group, color_mode=color_mode, show_units=show_units, prefix=prefix, suffix=suffix, connect_callback=self.py_connect_callback) self.is_initialize_complete() self.valueChanged.connect(self.valuechange) self.configure_widget() if self.pv_within_daq_group is False: self.monitor_start() def py_connect_callback(self, handle, pvname, status): ''' Callback function to be invoked on change of pv connection status. ''' self.trigger_connect.emit(int(handle), str(pvname), int(status)) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): PVGateway.receive_monitor_update(self, value, status, alarm_severity) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' PVGateway.receive_connect_update(self, handle, pv_name, status) def configure_widget(self): self.previousValue = None self.currentValue = None self.setFocusPolicy(Qt.StrongFocus) self.setButtonSymbols(QAbstractSpinBox.UpDownArrows) self.setAccelerated(False) self.setLineEdit(QLineEditExtended(self)) self.lineEdit().setReadOnly(False) self.lineEdit().setAlignment(Qt.AlignRight) self.lineEdit().setFont(QFont("Sans Serif", 12)) _stepsize = 10**(self.precision * -1) self.setSingleStep(_stepsize) self.setDecimals(self.precision) fm = QFontMetricsF(QFont("Sans Serif", 12)) _suggested_text = self.suggested_text _added_text = "" if self.show_units: _added_text += " " + self.units _suggested_text += self.units if self.suffix: _added_text += " " + self.suffix _suggested_text += self.suffix self.setSuffix(_added_text) qrect = fm.boundingRect(_suggested_text) _width_scaling_factor = 1.15 self.setFixedHeight(int(fm.lineSpacing()*1.8)) self.setFixedWidth(int(qrect.width() * _width_scaling_factor)) self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER) if self.pv_ctrl is not None: self.setRange(int(self.pv_ctrl.lowerControlLimit), int(self.pv_ctrl.upperControlLimit)) def post_display_value(self, value): '''set value from monitor''' if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) self.setValue(value) self.blockSignals(False) else: self.setValue(value) def mousePressEvent(self, event): _opt = QStyleOptionSpinBox() self.initStyleOption(_opt) _rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt, QStyle.SC_SpinBoxUp, self) _rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt, QStyle.SC_SpinBoxDown, self) self.previousValue = self.value() if event.button() == Qt.LeftButton: if _rect_up.contains(event.pos(), proper=False) or \ _rect_down.contains(event.pos(), proper=False): if not self.cafe.isConnected(self.handle): self.pv_message_in_a_box.setText( ("Spinbox change value events currently suspended\n" + "as channel {0} is disconnected.").format( self.pv_name)) self.pv_message_in_a_box.exec() return QDoubleSpinBox.mousePressEvent(self, event) local_event_position = QPoint(event.x(), event.y()) local_cursor_position = self.lineEdit().cursorPositionAt( local_event_position) self.lineEdit().setCursorPosition(local_cursor_position) PVGateway.mousePressEvent(self, event) def mouseReleaseEvent(self, event): self.clearFocus() self.lineEdit().clearFocus() def setValue(self, value): self.currentValue = self.value() QDoubleSpinBox.setValue(self, value) def valuechange(self, fval): status = self.cafe.set(self.handle, fval) if status != self.cyca.ICAFE_NORMAL: if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) if self.previousValue is not None: self.setValue(self.previousValue) else: _value = self.cafe.getCache(self.handle, 'float') if _value is not None: self.setValue(_value) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(False) self.pv_message_in_a_box.setText( ("Spinbox set operation reports error:\n{0}" .format(self.cafe.getStatusCodeAsString(status)))) self.pv_message_in_a_box.exec() else: if self.previousValue is not None: self.setValue(self.previousValue) else: _value = self.cafe.getCache(self.handle, 'float') if _value is not None: self.setValue(_value) self.parent.statusbar.showMessage( (self.widget_class + " " + self.cafe.getStatusCodeAsString(status))) def enterEvent(self, event): self.setFocusPolicy(Qt.StrongFocus) if self.pv_info is not None: if self.pv_info.accessWrite == 0: self.setProperty("readOnly", True) self.qt_style_polish() self.setReadOnly(True) def leaveEvent(self, event): if self.isReadOnly(): self.setReadOnly(False) self.setProperty(self.qt_dynamic_property_get(), True) self.qt_style_polish() self.clearFocus() self.lineEdit().clearFocus() self.lineEdit().setFocusPolicy(Qt.NoFocus) self.setFocusPolicy(Qt.NoFocus) del event def keyPressEvent(self, event): if event.key() in (Qt.Key_Return, Qt.Key_Enter): QDoubleSpinBox.keyPressEvent(self, event) self.clearFocus() self.lineEdit().clearFocus() self.lineEdit().setFocusPolicy(Qt.NoFocus) self.setFocusPolicy(Qt.NoFocus) elif event.key() in (Qt.Key_Up, Qt.Key_Down): QDoubleSpinBox.keyPressEvent(self, event) else: if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(True) QDoubleSpinBox.keyPressEvent(self, event) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.blockSignals(False) # The spin box should not gain focus by using the mouse wheel. # This is accomplished by setting the focus policy to Qt.StrongFocus. # The spin box should only accept wheel events if it already has the focus. # This is accomplished by reimplementing QWidget.wheelEvent within a # QSpinBox subclass: def wheelEvent(self, event): if self.hasFocus() is False: event.ignore() else: QDoubleSpinBox.wheelEvent(self, event) class reconnectQPushButton(QPushButton, QThread): def __init__(self, parent=None): super().__init__() self.parent = parent self.clicked.connect(self.onClicked) self.isdirty = False self._handles_to_reconnect = [] self.reconnectThread = None def onClicked(self, event): self._handles_to_reconnect = [] for i in range(0, len(self.parent.pv_gateway)): if self.parent.item( i, self.parent.no_columns-1).checkState() == Qt.Checked: self._handles_to_reconnect.append( self.parent.pv_gateway[i].handle) self.reconnect() QApplication.processEvents() def reconnect(self): QApplication.processEvents() self.isdirty = True if self._handles_to_reconnect: self.parent.cafe.reconnect(self._handles_to_reconnect) self.isdirty = False #Uncheck reconnected channels for i in range(0, len(self.parent.pv_gateway)): if self.parent.item( i, self.parent.no_columns-1).checkState() == Qt.Checked: if self.parent.cafe.isConnected( self.parent.pv_gateway[i].handle): self.parent.item( i, self.parent.no_columns-1).setCheckState(False) else: #Even if not connected - uncheck box as action is complete self.parent.item( i, self.parent.no_columns-1).setCheckState(False) #Uncheck global reconnect check box self.parent.cb_item_all.setCheckState(Qt.Unchecked) class CAQTableWidget(QTableWidget): '''Channel access enabled QTableWidget widget''' #trigger_monitor_float = Signal(float, int, int) #trigger_monitor_int = Signal(int, int, int) #trigger_monitor_str = Signal(str, int, int) #trigger_connect = Signal(int, str, int) def hasNewData(self, _row, pv_data): if self.pv_gateway[_row].pvd_previous is None: return True newDataFlag = False if self.pv_gateway[_row].pvd_previous.ts[1] != pv_data.ts[1]: newDataFlag = True elif self.pv_gateway[_row].pvd_previous.ts[0] != pv_data.ts[0]: newDataFlag = True # Catch disconnect events(!!) and set newDataFlag only elif self.pv_gateway[_row].pvd_previous.status != pv_data.status: newDataFlag = True return newDataFlag def paint_rows(self, row_range: list=[], reset=False, last_row=[" ", " "], columns=[0]): _qcolor_last_line = QColor("#d1e8e9") self.font_pts8 = QTableWidgetItem().font() self.font_pts8.setPointSize(8) if not row_range: row_range = [0, self.rowCount()-1] if reset: _qcolor = self.item(0, self.columnCount()-1).background() _start = 0 _end = self.rowCount()-1 else: _qcolor = _qcolor_last_line _start = row_range[0] _end = row_range[1] for _row in range(_start, _end): #if not reset: #_cell.setFont(self.font_pts8) if 1 in columns: self.item(_row, 0).setBackground(_qcolor) self.item(_row, 0).setFont(self.font_pts8) if 0 in columns: _cell = QTableWidgetItem("{0}".format(_row+1)) #if not reset: _cell.setFont(self.font_pts8) #_cell.setTextAlignment(Qt.AlignCenter) self.setVerticalHeaderItem(_row, _cell) #last row if reset and 0 in columns: _cell = QTableWidgetItem("{0}".format(last_row[0])) _cell.setFont(self.font_pts8) self.setVerticalHeaderItem(self.rowCount()-1, _cell) self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter) self.item(self.rowCount()-1, 0).setText(str(last_row[1])) self.item(self.rowCount()-1, 0).setBackground(_qcolor) self.item(self.rowCount()-1, 0).setFont(self.font_pts8) elif last_row[0] != " ": _cell = QTableWidgetItem("{0}".format(last_row[0])) _cell.setBackground(_qcolor_last_line) _cell.setFont(self.font_pts8) self.setVerticalHeaderItem(self.rowCount()-1, _cell) if columns: self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter) self.item(self.rowCount()-1, 0).setText(str(last_row[1])) self.item(self.rowCount()-1, 0).setBackground(_qcolor_last_line) self.item(self.rowCount()-1, 0).setFont(self.font_pts8) def widget_update(self): """ Called when self.notif_unison is True """ for _row, pvgate in enumerate(self.pv_gateway): if not pvgate.notify_unison: continue _handle = pvgate.handle _pvd = pvgate.cafe.getPVCache(_handle) if _pvd.status in (self.cyca.ICAFE_CS_NEVER_CONN, self.cyca.ICAFE_CA_OP_CONN_DOWN): pvgate.pvd_previous = _pvd continue pvgate.pvd_previous = _pvd #if timestamps the same - then skip _value = _pvd.value[0] if self.scale_factor != 1: _value = _value * self.scale_factor _value = pvgate.format_display_value(_value) qtwi = QTableWidgetItem(str(_value)+ " ") f = qtwi.font() f.setPointSize(8) qtwi.setFont(f) val_col_no = self.columns_dict['Value'] if self.show_timestamp: ts_col_no = self.columns_dict['Timestamp'] self.setItem(_row, val_col_no, QTableWidgetItem(qtwi)) self.item(_row, val_col_no).setTextAlignment( Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter)) _ts_date = _pvd.tsDateAsString _ts_str_len = len(_ts_date) _ilength_target = self.format_ts_nano while _ts_str_len < _ilength_target: _ts_date += "0" _ilength_target = _ilength_target - 1 _ts_str_len = len(_ts_date) _ts_str = _ts_date[0: _ts_str_len - (self.format_ts_nano - self.format_ts_decimal_part)] _ts_str_len = len(_ts_str) _ilength_target = self.format_ts_decimal_part if self.format_ts_decimal_part == self.format_ts_deci: if _ts_str_len == self.format_ts_sec: _ts_str += "." while _ts_str_len < _ilength_target: _ts_str += "0" _ilength_target = _ilength_target -1 qtwi = QTableWidgetItem(_ts_str) f = qtwi.font() f.setPointSize(8) qtwi.setFont(f) if self.show_timestamp: self.setItem(_row, ts_col_no, QTableWidgetItem(qtwi)) self.item(_row, ts_col_no).setTextAlignment(Qt.AlignCenter) _prop = pvgate.qt_dynamic_property_get() alarm_severity = _pvd.alarmSeverity if _prop == pvgate.READBACK_ALARM: if alarm_severity == pvgate.cyca.SEV_MAJOR: _bgcolor = pvgate.fg_alarm_major _fgcolor = "black" elif alarm_severity == pvgate.cyca.SEV_MINOR: _bgcolor = pvgate.fg_alarm_minor _fgcolor = "black" elif alarm_severity == pvgate.cyca.SEV_INVALID: _bgcolor = pvgate.fg_alarm_invalid _fgcolor = "#777777" else: _bgcolor = pvgate.fg_alarm_noalarm _fgcolor = "black" #Colors for bg/fg reversed as is the old norm self.item(_row, val_col_no).setBackground(QColor(_bgcolor)) self.item(_row, val_col_no).setForeground(QColor(_fgcolor)) if self.show_timestamp: self.item(_row, ts_col_no).setBackground(QColor(_bgcolor)) self.item(_row, ts_col_no).setForeground(QColor(_fgcolor)) elif _prop == pvgate.READBACK_STATIC: self.item(_row, val_col_no).setBackground( QColor(pvgate.bg_readback)) #self.item(_row, val_col_no).setBackground( # QColor('#aeae66')) if self.show_timestamp: self.item(_row, ts_col_no).setBackground( QColor(pvgate.bg_readback)) elif _prop == pvgate.DISCONNECTED: self.item(_row, val_col_no).setBackground( QColor("#ffffff")) self.item(_row, val_col_no).setForeground( QColor("#777777")) if self.show_timestamp: self.item(_row, ts_col_no).setBackground(QColor("#ffffff")) self.item(_row, ts_col_no).setForeground(QColor("#777777")) else: print(_prop, "widget_update unknown in element/row", _row, _row+1) QApplication.processEvents() def __init__(self, parent=None, pv_list: list = ["PV_NAME_NOT_GIVEN"], monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units: bool = True, prefix: str = "", suffix: str = "", ts_res: str = "milli", init_column: bool = False, init_list: list = [], standby_column: bool = False, standby_list: list = [], standby_values: list = [], set_delay: float = 0.0, notify_freq_hz: int = 0, notify_unison: bool = True, precision: int = 0, scale_factor: float = 1, show_timestamp: bool = True, pv_list_show: list = None): super().__init__() self.columns_dict = {} _column_dict_value = 0 if pv_list_show is not None: if pv_list_show[0]: self.columns_dict['PV'] = _column_dict_value _column_dict_value += 1 else: self.columns_dict['PV'] = _column_dict_value _column_dict_value += 1 if init_column: self.columns_dict['Init'] = _column_dict_value _column_dict_value += 1 if standby_column: self.columns_dict['Standby'] = _column_dict_value _column_dict_value += 1 self.columns_dict['Value'] = _column_dict_value if show_timestamp: _column_dict_value += 1 self.columns_dict['Timestamp'] = _column_dict_value _column_dict_value += 1 self.columns_dict['Reconnect'] = _column_dict_value self.setWindowModality(Qt.ApplicationModal) self.no_columns = _column_dict_value + 1 self.init_column = init_column self.init_list = init_list if self.init_column and not self.init_list: self.init_list = pv_list self.standby_column = standby_column self.standby_list = standby_list if self.standby_column and not self.standby_list: self.standby_list = pv_list self.standby_values = standby_values self.set_delay = set_delay self.icount = 0 self.notify_freq_hz = abs(notify_freq_hz) self.notify_freq_hz_default = self.notify_freq_hz self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \ int(1000 / self.notify_freq_hz) self.notify_unison = bool(notify_unison) and bool(self.notify_freq_hz) self.precision = precision self.scale_factor = scale_factor self.show_timestamp = show_timestamp self.format_ts_nano = 31 #max length of date self.format_ts_micro = 28 self.format_ts_milli = 25 self.format_ts_deci = 23 #-8 self.format_ts_sec = 21 if "nano" in ts_res.lower(): self.format_ts_decimal_part = self.format_ts_nano elif "micro" in ts_res.lower(): self.format_ts_decimal_part = self.format_ts_micro elif "milli" in ts_res.lower(): self.format_ts_decimal_part = self.format_ts_milli elif "deci" in ts_res.lower(): self.format_ts_decimal_part = self.format_ts_deci elif "sec" in ts_res.lower(): self.format_ts_decimal_part = self.format_ts_sec else: self.format_ts_decimal_part = self.format_ts_milli self.pv2item_dict = {} self.pv_list = pv_list self.pv_gateway = [None] * len(self.pv_list) self.pv_list_show = pv_list_show if self.pv_list_show is None: self.pv_list_show = self.pv_list _color_mode = [None] * len(self.pv_list) if color_mode is not None: if isinstance(color_mode, list): for i in range(0, len(color_mode)): _color_mode[i] = color_mode[i] else: for i in range(0, len(_color_mode)): _color_mode[i] = color_mode for i in range(0, len(self.pv_list)): self.pv_gateway[i] = PVGateway( parent, self.pv_list[i], monitor_callback, pv_within_daq_group, _color_mode[i], show_units, prefix, suffix, connect_triggers=False, notify_freq_hz=self.notify_freq_hz, notify_unison=self.notify_unison, precision=self.precision) self.pv_gateway[i].is_initialize_complete() self.pv_gateway[i].trigger_connect.connect( self.receive_connect_update) self.pv_gateway[i].trigger_monitor_str.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor_int.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor_float.connect( self.receive_monitor_update) self.pv_gateway[i].widget_class = "QTableWidgetItem" self.pv_gateway[i].qt_property_initial_values( qt_object_name=self.pv_gateway[i].PV_READBACK, tool_tip=False) #required for reconnect self.cafe = self.pv_gateway[0].cafe self.cyca = self.pv_gateway[0].cyca self.timer = None if self.notify_unison: self.timer = QTimer() self.timer.timeout.connect(self.widget_update) self.timer.singleShot(0, self.widget_update) self.timer.start(self.notify_milliseconds) self.reconnect_button = None self.configure_widget() #Connect only deals with colours - only helps on reconnect # In any case monitors take over #Got to do this earlier or emit immediately after!! for i in range(0, len(self.pv_gateway)): if self.cafe.isConnected(self.pv_gateway[i].pv_name): self.pv_gateway[i].trigger_connect.emit( self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name), self.pv_gateway[i].cyca.ICAFE_CS_CONN) for i in range(0, len(self.pv_gateway)): if not self.pv_gateway[i].pv_within_daq_group: self.pv_gateway[i].monitor_start() if init_column: self.update_init_values() if standby_column: self.update_standby_values() self.configure_context_menu() def configure_context_menu(self): self.table_context_menu = QMenu() self.table_context_menu.setObjectName("contextMenu") self.table_context_menu.setWindowModality(Qt.NonModal) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.table_context_menu.addSection("---") action1 = QAction("Configure Table PVs", self) action1.triggered.connect(self.display_table_parameters) self.table_context_menu.addAction(action1) if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"): self.table_context_menu.addSection("---") QApplication.processEvents() def set_standby_values(self, pv_list: list = []): if self.init_column: self.init_value_button.setEnabled(False) self.restore_value_button.setEnabled(False) _text = self.standby_value_button.text() self.standby_value_button.setText("Downing") self.standby_value_button.setEnabled(False) if self.reconnect_button is not None: self.reconnect_button.setEnabled(False) _set_values_dict = self.get_standby_values() #return 1,2,3 if not pv_list: _pvs_to_set, _values_to_set = zip(*_set_values_dict.items()) #zip returns tuples _pvs_to_set = list(_pvs_to_set) _values_to_set = list(_values_to_set) else: _pvs_to_set = [] _values_to_set = [] for pv in pv_list: if pv in _set_values_dict.keys(): _pvs_to_set.append(pv) _values_to_set.append(_set_values_dict[pv]) #self.standby_value_button.setEnabled(False) #QApplication.processEvents(QEventLoop.AllEvents, 1.0) if self.set_delay <= 0: status, status_list = self.cafe.setScalarList(_pvs_to_set, _values_to_set) else: status_list = [self.cyca.ICAFE_NORMAL] * len(_pvs_to_set) status = self.cyca.ICAFE_NORMAL for i, (pv, val) in enumerate(zip(_pvs_to_set, _values_to_set)): status_list[i] = self.cafe.set(pv, val) if status_list[i] != self.cyca.ICAFE_NORMAL: status = status_list[i] ##self.standby_value_button.setEnabled(False) ##QApplication.processEvents(QEventLoop.AllEvents, 1.0) time.sleep(self.set_delay) ##self.standby_value_button.setEnabled(False) #QApplication.sendPostedEvents() QApplication.processEvents(QEventLoop.AllEvents, 1) if status != self.cyca.ICAFE_NORMAL: _mess = ("The following devices reported an error " + "in 'set' operation:") for i, status_value in enumerate(status_list): if status_value != self.cyca.ICAFE_NORMAL: _mess += ("\n" + _pvs_to_set[i] + " has status = " + str(status_value) + " " + self.cafe.getStatusCodeAsString(status_value) + " " + self.cafe.getStatusInfo(status_value)) qm = QMessageBox() qm.setText(_mess) #qm.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel) #qm.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) #qm.setFixedWidth(800) ##self.standby_value_button.setEnabled(False) ##QApplication.sendPostedEvents() qm.exec() #QApplication.processEvents() if self.init_column: self.init_value_button.setEnabled(True) self.restore_value_button.setEnabled(True) self.standby_value_button.setText(_text) self.standby_value_button.setEnabled(True) if self.reconnect_button is not None: self.reconnect_button.setEnabled(True) return status, status_list, _pvs_to_set def restore_init_values(self, pv_list: list = []): _text = self.restore_value_button.text() self.restore_value_button.setText("Restoring") self.restore_value_button.setEnabled(False) if self.init_column: self.init_value_button.setEnabled(False) if self.standby_column: self.standby_value_button.setEnabled(False) if self.reconnect_button is not None: self.reconnect_button.setEnabled(False) _set_values_dict = self.get_init_values() if not pv_list: _pvs_to_set, _values_to_set = zip(*_set_values_dict.items()) #zip returns tuples _pvs_to_set = list(_pvs_to_set) _values_to_set = list(_values_to_set) else: _pvs_to_set = [] _values_to_set = [] for pv in pv_list: if pv in _set_values_dict.keys(): _pvs_to_set.append(pv) _values_to_set.append(_set_values_dict[pv]) if self.set_delay <= 0: status, status_list = self.cafe.setScalarList(_pvs_to_set, _values_to_set) else: status_list = [self.cyca.ICAFE_NORMAL] * len(_pvs_to_set) status = self.cyca.ICAFE_NORMAL for i, (pv, val) in enumerate(zip(_pvs_to_set, _values_to_set)): status_list[i] = self.cafe.set(pv, val) if status_list[i] != self.cyca.ICAFE_NORMAL: status = status_list[i] time.sleep(self.set_delay) QApplication.processEvents(QEventLoop.AllEvents, 1) if status != self.cyca.ICAFE_NORMAL: _mess = ("The following device(s) reported an error " + "in 'set' operation:") for i, status_value in enumerate(status_list): if status_value != self.cyca.ICAFE_NORMAL: _mess += ("\n" + _pvs_to_set[i] + " has status = " + str(status_value) + " " + self.cafe.getStatusCodeAsString(status_value) + " " + self.cafe.getStatusInfo(status_value)) qm = QMessageBox() qm.setText(_mess) qm.exec() QApplication.processEvents() if self.init_column: self.init_value_button.setEnabled(True) self.restore_value_button.setText(_text) self.restore_value_button.setEnabled(True) if self.standby_column: self.standby_value_button.setEnabled(True) if self.reconnect_button is not None: self.reconnect_button.setEnabled(True) return status, status_list, _pvs_to_set def is_same_as_init_values(self): _init_values_dict = self.get_column_values(self.columns_dict['Init']) _pvs, _init_values = zip(*_init_values_dict.items()) _current_values_dict = self.get_column_values( self.columns_dict['Value']) _pvs, _current_values = zip(*_current_values_dict.items()) #zip returns tuples return bool(func_reduce(lambda i, j: i and j, map( lambda m, k: m == k, _init_values, _current_values), True)) #if func_reduce(lambda i, j: i and j, map( # lambda m, k: m == k, _init_values, _current_values), True): # return True #else: # return False def get_column_values(self, column_no): _values_dict = {} _start = 0 _end = len(self.pv_gateway) _pvs = [None] * _end _values_str = [None] * _end _values = [None] * _end for _row in range(_start, _end): _values_str[_row] = self.item(_row, column_no).text() _pvs[_row] = self.item(_row, 0).text() _value_list = [float(_value_list) for _value_list in re.findall( r'-?\d+\.?\d*', _values_str[_row])] if not _value_list: print("row", _row, "values", _values_str[_row], _pvs[_row]) _values[_row] = _values_str[_row] #Can be enum string else: _values[_row] = _value_list[0] if _pvs[_row] in self.pv_list_show: _values_dict[self.pv_gateway[_row].pv_name] = _values[_row] else: _values_dict[_row] = _values[_row] return _values_dict #_pvs_to_set, _values_to_set def get_init_values(self): return self.get_column_values(self.columns_dict['Init']) def get_standby_values(self): return self.get_column_values(self.columns_dict['Standby']) def get_init_values_previous(self): _set_values_dict = {} _start = 0 _end = len(self.pv_gateway) _pvs_to_set = [None] * _end _values_to_set_str = [None] * _end _values_to_set = [None] * _end for _row in range(_start, _end): _values_to_set_str[_row] = self.item( _row, self.columns_dict['Init']).text() _pvs_to_set[_row] = self.item(_row, self.columns_dict['PV']).text() _value_list = [float(_value_list) for _value_list in re.findall( r'-?\d+\.?\d*', _values_to_set_str[_row])] if not _value_list: print("//row", _row, "values", _values_to_set_str[_row], _pvs_to_set[_row]) _values_to_set[_row] = _values_to_set_str[_row] #Can be enum str else: _values_to_set[_row] = _value_list[0] if _pvs_to_set[_row] in self.init_list: _set_values_dict[ self.pv_gateway[_row].pv_name] = _values_to_set[_row] return _set_values_dict def update_standby_values(self): _start = 0 _end = len(self.standby_values) if 'Standby' in self.columns_dict: _column_no = self.columns_dict['Standby'] else: return for _row in range(_start, _end): _value = self.standby_values[_row] if self.scale_factor != 1: _value = _value * self.scale_factor _value = self.pv_gateway[_row].format_display_value(_value) qtwi = QTableWidgetItem(str(_value)+ " ") _f = qtwi.font() _f.setPointSize(8) qtwi.setFont(_f) self.setItem(_row, _column_no, qtwi) self.item(_row, _column_no).setTextAlignment( Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter)) def set_init_values(self, values): _start = 0 _end = min(len(self.pv_gateway), len(values)) if 'Init' in self.columns_dict: _column_no = self.columns_dict['Init'] else: return for _row in range(_start, _end): _value = self.pv_gateway[_row].format_display_value(values[_row]) qtwi = QTableWidgetItem(str(_value)+ " ") _f = qtwi.font() _f.setPointSize(8) qtwi.setFont(_f) self.setItem(_row, _column_no, qtwi) self.item(_row, _column_no).setTextAlignment( Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter)) def update_init_values(self): _start = 0 _end = len(self.pv_gateway) if 'Init' in self.columns_dict: _column_no = self.columns_dict['Init'] else: return for _row in range(_start, _end): _handle = self.pv_gateway[_row].handle _value = self.pv_gateway[_row].cafe.getCache(_handle) if _value is not None: if self.scale_factor != 1: _value = _value * self.scale_factor _value = self.pv_gateway[_row].format_display_value(_value) qtwi = QTableWidgetItem(str(_value)+ " ") _f = qtwi.font() _f.setPointSize(8) qtwi.setFont(_f) self.setItem(_row, _column_no, qtwi) self.item(_row, _column_no).setTextAlignment( Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter)) def configure_widget(self): _column_width_pvname = 180 _column_width_value = 90 _column_width_timestamp = 210 _column_width_checkbox = 22 item_font = QTableWidgetItem().font() item_font.setPixelSize(13) self.setRowCount(len(self.pv_gateway)+1) self.setColumnCount(self.no_columns) self.setEditTriggers(QAbstractItemView.NoEditTriggers) self.resizeColumnsToContents() self.resizeRowsToContents() #self.horizontalHeader().setStretchLastSection(True); if 'PV' in self.columns_dict.keys(): self.setColumnWidth(self.columns_dict['PV'], _column_width_pvname) _pv_column = self.columns_dict['PV'] self.setColumnWidth(self.columns_dict['Value'], _column_width_value) if 'Init' in self.columns_dict.keys(): self.setColumnWidth(self.columns_dict['Init'], _column_width_value) if 'Timestamp' in self.columns_dict.keys(): self.setColumnWidth(self.columns_dict['Timestamp'], _column_width_timestamp) self.setColumnWidth(self.columns_dict['Reconnect'], _column_width_checkbox) for i in range(0, len(self.pv_gateway)): istart = 1 if 'PV' in self.columns_dict.keys(): qtwt = QTableWidgetItem(self.pv_list_show[i]) f = qtwt.font() f.setPointSize(8) qtwt.setFont(f) self.setItem(i, _pv_column, qtwt) self.item(i, _pv_column).setTextAlignment( Qt.AlignmentFlag(Qt.AlignHCenter|Qt.AlignVCenter)) else: istart = 0 for i_column in range(istart, self.no_columns-1): self.setItem(i, i_column, QTableWidgetItem(str(""))) self.item(i, i_column).setTextAlignment( Qt.AlignmentFlag(Qt.AlignHCenter|Qt.AlignVCenter)) self.pv2item_dict[self.pv_gateway[i]] = i cb_item = QTableWidgetItem() cb_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) cb_item.setCheckState(Qt.Unchecked) cb_item.setTextAlignment(Qt.AlignCenter) cb_item.setToolTip(self.pv_gateway[i].pv_name) self.setItem(i, self.no_columns-1, cb_item) self.item(i, self.no_columns-1).setTextAlignment(Qt.AlignCenter) if self.init_column: self.init_widget = QWidget() self.init_layout = QHBoxLayout(self.init_widget) self.init_value_button = QPushButton() self.init_value_button.setText("Update") f = self.init_value_button.font() f.setPointSize(8) self.init_value_button.setFont(f) self.init_value_button.setFixedWidth(64) self.init_value_button.clicked.connect(self.update_init_values) self.init_value_button.setToolTip( ("Stores initial, pre-measurement value. Update is also " + "typically executed automatically before analysis procedure.")) self.init_layout.addWidget(self.init_value_button) self.init_layout.setAlignment(Qt.AlignCenter) self.init_layout.setContentsMargins(1, 1, 1, 0) #Required self.init_widget.setLayout(self.init_layout) if 'PV' in self.columns_dict: self.setCellWidget(len(self.pv_gateway), 0, self.init_widget) else: self.setCellWidget(len(self.pv_gateway), self.columns_dict['Init'], self.init_widget) self.restore_widget = QWidget() self.restore_layout = QHBoxLayout(self.restore_widget) self.restore_value_button = QPushButton() self.restore_value_button.setStyleSheet( "QPushButton{background-color: rgb(212, 219, 157);}") self.restore_value_button.setText("Restore") f = self.restore_value_button.font() f.setPointSize(8) self.restore_value_button.setFont(f) self.restore_value_button.setFixedWidth(80) self.restore_value_button.clicked.connect(self.restore_init_values) self.restore_value_button.setToolTip( ("Restore devices to their pre-measurement values")) self.restore_layout.addWidget(self.restore_value_button) self.restore_layout.setAlignment(Qt.AlignCenter) self.restore_layout.setContentsMargins(1, 1, 0, 0) self.restore_widget.setLayout(self.restore_layout) if 'PV' in self.columns_dict: self.setCellWidget(len(self.pv_gateway), self.columns_dict['Init'], self.restore_widget) if self.standby_column: _standby_widget = QWidget() _standby_layout = QHBoxLayout(_standby_widget) self.standby_value_button = QPushButton() self.standby_value_button.setStyleSheet( "QPushButton{background-color: rgb(212, 219, 157);}") self.standby_value_button.setText("Standby") f = self.standby_value_button.font() f.setPointSize(8) self.standby_value_button.setFont(f) self.standby_value_button.setFixedWidth(80) self.standby_value_button.clicked.connect(self.set_standby_values) self.standby_value_button.setToolTip( ("Set devices to their pre-set standby values")) _standby_layout.addWidget(self.standby_value_button) _standby_layout.setAlignment(Qt.AlignCenter) _standby_layout.setContentsMargins(1, 1, 0, 0) _standby_widget.setLayout(_standby_layout) self.setCellWidget(len(self.pv_gateway), self.columns_dict['Standby'], _standby_widget) #Do not display no for last row (Reconnect button) _row_digit_last_cell = QTableWidgetItem(str("")) self.setVerticalHeaderItem(len(self.pv_gateway), _row_digit_last_cell) self.setItem(len(self.pv_gateway), 0, QTableWidgetItem(str(""))) _qwb = QWidget() self.reconnect_button = reconnectQPushButton(self) #self required f = self.reconnect_button.font() if 'Timestamp' in self.columns_dict.keys(): f.setPixelSize(11) #previous 13 self.reconnect_button.setFixedWidth(100) else: f.setPixelSize(11) #6 self.reconnect_button.setFixedWidth(66) #58 self.reconnect_button.setFont(f) self.reconnect_button.setText("Reconnect") self.reconnect_button.setToolTip("Reconnect selected/checked channels") _layout = QHBoxLayout(_qwb) _layout.addWidget(self.reconnect_button) _layout.setAlignment(Qt.AlignCenter) _layout.setContentsMargins(0, 0, 0, 0) #Required #_reconnect_button self.setCellWidget(len(self.pv_gateway), self.no_columns-2, _qwb) self.cb_item_all = QCheckBox() self.cb_item_all.setCheckState(Qt.Unchecked) self.cb_item_all.stateChanged.connect(self.reconnectStateChanged) self.cb_item_all.setObjectName("Reconnect") self.setCellWidget(len(self.pv_gateway), self.no_columns-1, self.cb_item_all) if 'PV' in self.columns_dict.keys(): header_item = QTableWidgetItem("Process Variable") f = header_item.font() f.setPixelSize(13) header_item.setFont(f) self.setHorizontalHeaderItem(self.columns_dict['PV'], header_item) if 'Init' in self.columns_dict.keys(): header_item = QTableWidgetItem("Initial Value") f = header_item.font() f.setPixelSize(13) header_item.setFont(f) self.setHorizontalHeaderItem(self.columns_dict['Init'], header_item) header_item = QTableWidgetItem("Value") f = header_item.font() f.setPixelSize(13) header_item.setFont(f) self.setHorizontalHeaderItem(self.columns_dict['Value'], header_item) if 'Timestamp' in self.columns_dict.keys(): header_item = QTableWidgetItem("Timestamp") f = header_item.font() f.setPixelSize(13) header_item.setFont(f) self.setHorizontalHeaderItem(self.columns_dict['Timestamp'], header_item) header_item = QTableWidgetItem("R") f = header_item.font() f.setPixelSize(13) header_item.setFont(f) self.setHorizontalHeaderItem(self.columns_dict['Reconnect'], header_item) self.setFocusPolicy(Qt.NoFocus) self.setEditTriggers(QAbstractItemView.NoEditTriggers) self.setSelectionMode(QAbstractItemView.NoSelection) self.verticalHeader().setDefaultAlignment(Qt.AlignRight) self.verticalHeader().setFixedWidth(22) _fm_font = QFont("Sans Serif") _fm_font.setPointSize(12) fm = QFontMetricsF(_fm_font) _factor = 1 if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"): _factor = 1.18 self.setFixedHeight( int(fm.lineSpacing() * _factor * (len(self.pv_gateway)+3))) _min_table_width = 620 if not self.init_column else 650 self.setMinimumWidth(_min_table_width) for _row in range(0, len(self.pv_gateway)): if 'PV' in self.columns_dict.keys(): self.item(_row, _pv_column).setForeground(QColor("#000000")) istart = 1 else: istart = 0 for i_column in range(istart, self.no_columns-2): self.item(_row, i_column).setForeground(QColor("#000000")) self.item(_row, i_column).setTextAlignment( Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter)) self.item(_row, self.columns_dict['Value']).setBackground( QColor("#ffffff")) if 'Timestamp' in self.columns_dict.keys(): self.item(_row, self.columns_dict['Timestamp']).setTextAlignment( Qt.AlignCenter) self.item(_row, self.columns_dict['Timestamp']).setBackground( QColor("#ffffff")) @Slot(int) def reconnectStateChanged(self, state): if state == Qt.Unchecked: for i in range(0, len(self.pv_gateway)): self.item(i, self.columns_dict['Reconnect']).setCheckState( Qt.Unchecked) else: for i in range(0, len(self.pv_gateway)): self.item(i, self.columns_dict['Reconnect']).setCheckState( Qt.Checked) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): _row = self.pv2item_dict[self.sender()] self.pv_gateway[_row].time_monotonic = time.monotonic() if self.scale_factor != 1: value = value * self.scale_factor _value = self.pv_gateway[_row].format_display_value(value) #if 'QMD10' in self.pv_gateway[_row].pv_name: # print(_value + " from widgets.py" + self.pv_gateway[_row].pv_name) qtwi = QTableWidgetItem(str(_value) + " ") f = qtwi.font() f.setPointSize(8) qtwi.setFont(f) qtwi.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled | Qt.ItemIsEditable) #qtwi.setEditTriggers(QAbstractItemView.AllEditTriggers) self.setItem(_row, self.columns_dict['Value'], qtwi) #self.item(_row, self.columns_dict['Value']).setFlags( #Qt.ItemIsSelectable | Qt.ItemIsEnabled | Qt.ItemIsEditable) #self.item(_row, self.columns_dict['Value']).setEditTriggers( #QAbstractItemView.AllEditTriggers) self.item(_row, self.columns_dict['Value']).setTextAlignment( Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter)) if 'Timestamp' in self.columns_dict.keys(): _handle = self.pv_gateway[_row].handle _pvd = self.pv_gateway[_row].cafe.getPVCache(_handle) _ts_date = _pvd.tsDateAsString _ts_str_len = len(_ts_date) _ilength_target = self.format_ts_nano while _ts_str_len < _ilength_target: _ts_date += "0" _ilength_target = _ilength_target -1 ##ts_str_len = len(_ts_date) _ts_str = _ts_date[0: _ts_str_len-( self.format_ts_nano-self.format_ts_decimal_part)] _ts_str_len = len(_ts_str) _ilength_target = self.format_ts_decimal_part if self.format_ts_decimal_part == self.format_ts_deci: if _ts_str_len == self.format_ts_sec: _ts_str += "." while _ts_str_len < _ilength_target: _ts_str += "0" _ilength_target = _ilength_target -1 qtwi = QTableWidgetItem(_ts_str) f = qtwi.font() f.setPointSize(8) qtwi.setFont(f) self.setItem(_row, self.columns_dict['Timestamp'], qtwi) self.item(_row, self.columns_dict['Timestamp']).setTextAlignment( Qt.AlignCenter) _prop = self.pv_gateway[_row].qt_dynamic_property_get() if _prop == self.pv_gateway[_row].READBACK_ALARM: if alarm_severity == self.pv_gateway[_row].cyca.SEV_MAJOR: _bgcolor = self.pv_gateway[_row].fg_alarm_major _fgcolor = "black" elif alarm_severity == self.pv_gateway[_row].cyca.SEV_MINOR: _bgcolor = self.pv_gateway[_row].fg_alarm_minor _fgcolor = "black" elif alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID: _bgcolor = self.pv_gateway[_row].fg_alarm_invalid _fgcolor = "#777777" else: _bgcolor = self.pv_gateway[_row].fg_alarm_noalarm _fgcolor = "black" #Colors for bg/fg reversed as is the old norm self.item(_row, self.columns_dict['Value']).setBackground( QColor(_bgcolor)) self.item(_row, self.columns_dict['Value']).setForeground( QColor(_fgcolor)) if 'Timestamp' in self.columns_dict.keys(): self.item(_row, self.columns_dict['Timestamp']).setBackground( QColor(_bgcolor)) self.item(_row, self.columns_dict['Timestamp']).setForeground( QColor(_fgcolor)) elif _prop == self.pv_gateway[_row].DISCONNECTED or \ alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID: self.item(_row, self.columns_dict['Value']).setBackground( QColor("#ffffff")) self.item(_row, self.columns_dict['Value']).setForeground( QColor("#777777")) if 'Timestamp' in self.columns_dict.keys(): self.item(_row, self.columns_dict['Timestamp']).setBackground( QColor("#ffffff")) self.item(_row, self.columns_dict['Timestamp']).setForeground( QColor("#777777")) elif _prop == self.pv_gateway[_row].READBACK_STATIC: self.item(_row, self.columns_dict['Value']).setBackground( QColor(self.pv_gateway[_row].bg_readback)) #self.item(_row, self.columns_dict['Value']).setBackground( # QColor("#ffffe1")) #self.item(_row, self.columns_dict['Value']).setFlags( #Qt.ItemIsSelectable | Qt.ItemIsEnabled | Qt.ItemIsEditable) if 'Timestamp' in self.columns_dict.keys(): self.item(_row, self.columns_dict['Timestamp']).setBackground( QColor(self.pv_gateway[_row].bg_readback)) else: print(_prop, self.pv_gateway[_row].DISCONNECTED, "(in monitor) unknown in element/row no.", _row, _row+1) QApplication.processEvents(QEventLoop.AllEvents, 10) @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' _row = self.pv2item_dict[self.sender()] self.pv_gateway[_row].receive_connect_update(handle, pv_name, status, post_display=False) _prop = self.pv_gateway[_row].qt_dynamic_property_get() #self.post_display_value(status) if _prop == self.pv_gateway[_row].DISCONNECTED: self.item(_row, self.columns_dict['Value']).setBackground( QColor("#ffffff")) self.item(_row, self.columns_dict['Value']).setForeground( QColor("#777777")) if 'Timestamp' in self.columns_dict.keys(): self.item(_row, self.columns_dict['Timestamp']).setBackground( QColor("#ffffff")) self.item(_row, self.columns_dict['Timestamp']).setForeground( QColor("#777777")) QApplication.processEvents() def table_precision_user_changed(self, new_value): self.pvgateway_precision = new_value for pvgate in self.pv_gateway: if pvgate.pv_ctrl is not None: self.pvgateway_precision = min(pvgate.pv_ctrl.precision, new_value) pvgate.precision_user = self.pvgateway_precision pvgate.precision = self.pvgateway_precision _pvd = self.cafe.getPVCache(pvgate.handle) if _pvd.value[0] is not None: if isinstance(_pvd.value[0], float): pvgate.trigger_monitor_float.emit( _pvd.value[0], _pvd.status, _pvd.alarmSeverity) def table_precision_ioc_reset(self): if self.max_precision_value == self.table_precision_user_wgt.value(): self.table_precision_user_changed(self.max_precision_value) else: self.table_precision_user_wgt.setValue(self.max_precision_value) def table_refresh_rate_changed(self, new_idx): _notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx] _notify_milliseconds = 0 if _notify_freq_hz == 0 else \ 1000 / _notify_freq_hz self.notify_freq_hz = _notify_freq_hz if _notify_milliseconds == 0: for pvgate in self.pv_gateway: pvgate.notify_unison = False pvgate.notify_milliseconds = _notify_milliseconds pvgate.notify_freq_hz = self.notify_freq_hz pvgate.monitor_stop() time.sleep(0.01) for pvgate in self.pv_gateway: pvgate.monitor_start() else: for pvgate in self.pv_gateway: if not pvgate.notify_unison: pvgate.monitor_stop() for pvgate in self.pv_gateway: pvgate.notify_milliseconds = _notify_milliseconds pvgate.notify_freq_hz = self.notify_freq_hz if not pvgate.notify_unison: pvgate.notify_unison = True pvgate.monitor_start() else: self.cafe.updateMonitorPolicyDeltaMS( pvgate.handle, pvgate.monitor_id, pvgate.notify_milliseconds) if self.timer is not None: self.timer.stop() else: self.timer = QTimer() self.timer.timeout.connect(self.widget_update) self.timer.singleShot(0, self.widget_update) if _notify_milliseconds > 0: self.timer.start(_notify_milliseconds) def table_ts_resolution_changed(self, new_idx): for i, ts_res in enumerate(self.ts_combox_idx_dict.values()): if i == new_idx: self.format_ts_decimal_part = ts_res break for pvgate in self.pv_gateway: _pvd = self.cafe.getPVCache(pvgate.handle) if _pvd.value[0] is not None: if isinstance(_pvd.value[0], float): pvgate.trigger_monitor_float.emit( _pvd.value[0], _pvd.status, _pvd.alarmSeverity) elif isinstance(_pvd.value[0], int): pvgate.trigger_monitor_int.emit( _pvd.value[0], _pvd.status, _pvd.alarmSeverity) else: pvgate.trigger_monitor_str.emit( str(_pvd.value[0]), _pvd.status, _pvd.alarmSeverity) def display_table_parameters(self): display_wgt = QDialog(self) display_wgt.setWindowTitle("PV Parameters") layout = QVBoxLayout() common_label_width = 120 common_wgt_width = 160 common_hbox_width = common_label_width + common_wgt_width + 20 self.initial_value = 0 self.max_precision_value = 0 for i, pvgate in enumerate(self.pv_gateway): if pvgate.pv_ctrl is not None: if pvgate.pv_ctrl.precision > 0: self.max_precision_value = max(self.max_precision_value, pvgate.pv_ctrl.precision) self.initial_value = max(self.initial_value, pvgate.precision) if self.max_precision_value > 0: #precision user _hbox_wgt = QWidget() _hbox = QHBoxLayout() precision_user_label = QLabel("Precision (user):") self.table_precision_user_wgt = QSpinBox(self) self.table_precision_user_wgt.setFocusPolicy(Qt.NoFocus) self.table_precision_user_wgt.setValue(self.initial_value) self.table_precision_user_wgt.setMaximum(self.max_precision_value) self.table_precision_user_wgt.valueChanged.connect( self.table_precision_user_changed) precision_user_label.setAlignment(Qt.AlignLeft) self.table_precision_user_wgt.setAlignment(Qt.AlignLeft) _hbox.addWidget(precision_user_label) _hbox.addWidget(self.table_precision_user_wgt) _hbox.setAlignment(Qt.AlignLeft) _hbox_wgt.setLayout(_hbox) precision_user_label.setFixedWidth(common_label_width) self.table_precision_user_wgt.setFixedWidth(40) _hbox_wgt.setFixedWidth(common_hbox_width) #precision ioc _hbox2_wgt = QWidget() _hbox2 = QHBoxLayout() precision_ioc_label = QLabel("Precision (ioc): ") precision_ioc = QPushButton(self) precision_ioc.setText("Reset") precision_ioc.clicked.connect(self.table_precision_ioc_reset) precision_ioc_label.setAlignment(Qt.AlignLeft) _hbox2.addWidget(precision_ioc_label) _hbox2.addWidget(precision_ioc) _hbox2.setAlignment(Qt.AlignLeft) _hbox2_wgt.setLayout(_hbox2) precision_ioc_label.setFixedWidth(common_label_width) precision_ioc.setFixedWidth(50) _hbox2_wgt.setFixedWidth(common_hbox_width) layout.addWidget(_hbox_wgt) layout.addWidget(_hbox2_wgt) if 'Timestamp' in self.columns_dict.keys(): #time-stamp _hbox4_wgt = QWidget() _hbox4 = QHBoxLayout() ts_label = QLabel("Timestamp: ") self.ts_combox_idx_dict = { 'second (s)': self.format_ts_sec, 'decisecond (ds)': self.format_ts_deci, 'millisecond (ms)': self.format_ts_milli, 'microsecond (\u03bcs)': self.format_ts_micro, 'nanosecond (ns)': self.format_ts_nano} ts_resolution = QComboBox(self) for key, ts_res in self.ts_combox_idx_dict.items(): ts_resolution.addItem(key) _current_idx = 0 for i, (key, ts_res) in enumerate(self.ts_combox_idx_dict.items()): if ts_res == self.format_ts_decimal_part: _current_idx = i break ts_resolution.setCurrentIndex(_current_idx) ts_resolution.currentIndexChanged.connect( self.table_ts_resolution_changed) _hbox4.addWidget(ts_label) _hbox4.addWidget(ts_resolution) _hbox4_wgt.setLayout(_hbox4) ts_label.setFixedWidth(common_label_width) ts_resolution.setFixedWidth(common_wgt_width) _hbox4_wgt.setFixedWidth(common_hbox_width) layout.addWidget(_hbox4_wgt) #precision refresh rate _hbox3_wgt = QWidget() _hbox3 = QHBoxLayout() refresh_freq_label = QLabel("Refresh rate: ") #_default_refresh_val = 0 if self.notify_freq_hz <= 0 else \ # self.notify_freq_hz _default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \ self.notify_freq_hz_default self.refresh_freq_combox_idx_dict = {0: 0, 1: 10, 2: 5, 3: 2, 4: 1, 5: 0.5, 6: _default_refresh_val} refresh_freq = QComboBox(self) refresh_freq.addItem('direct') refresh_freq.addItem('{0} Hz'.format( self.refresh_freq_combox_idx_dict[1])) refresh_freq.addItem('{0} Hz'.format( self.refresh_freq_combox_idx_dict[2])) refresh_freq.addItem('{0} Hz'.format( self.refresh_freq_combox_idx_dict[3])) refresh_freq.addItem('{0} Hz'.format( self.refresh_freq_combox_idx_dict[4])) refresh_freq.addItem('{0} Hz'.format( self.refresh_freq_combox_idx_dict[5])) _default_text = 'default (direct)' if _default_refresh_val == 0 else \ 'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6]) refresh_freq.addItem(_default_text) for key, value in self.refresh_freq_combox_idx_dict.items(): if value == self.notify_freq_hz: refresh_freq.setCurrentIndex(key) break refresh_freq.currentIndexChanged.connect( self.table_refresh_rate_changed) _hbox3.addWidget(refresh_freq_label) _hbox3.addWidget(refresh_freq) _hbox3_wgt.setLayout(_hbox3) refresh_freq_label.setFixedWidth(common_label_width) refresh_freq.setFixedWidth(common_wgt_width) _hbox3_wgt.setFixedWidth(common_hbox_width) layout.addWidget(_hbox3_wgt) layout.setAlignment(Qt.AlignLeft) layout.setContentsMargins(10, 0, 0, 0) layout.setSpacing(0) display_wgt.setMinimumWidth(340) display_wgt.setLayout(layout) display_wgt.exec() def mousePressEvent(self, event): row = self.indexAt(event.pos()).row() if row > -1: if row < len(self.pv_list): self.pv_gateway[row].mousePressEvent(event) else: button = event.button() if button == Qt.RightButton: self.table_context_menu.exec(QCursor.pos()) self.clearFocus() #remove highlighting which persists after mouse leaves def mouseMoveEvent(self, event): pass def leaveEvent(self, event): self.clearSelection() self.clearFocus() del event class QMessageWidget(QListWidget): """Log message window.""" def __init__(self, parent=None): super(QMessageWidget, self).__init__(parent) self.myItem = None self.setSelectionMode(QAbstractItemView.ExtendedSelection) self.setFocusPolicy(Qt.StrongFocus) def leaveEvent(self, event): if self.myItem: self.clearSelection() self.clearFocus() del event def mousePressEvent(self, event): item = self.itemAt(event.x(), event.y()) if item: self.myItem = item self.setCurrentItem(self.myItem) def keyPressEvent(self, event): if event.matches(QKeySequence.Copy): nitem = event.count() if nitem: if self.myItem is not None: _str = self.myItem.text() QApplication.clipboard().setText(_str) class QResultsWidget: """Results table""" def __init__(self, summary_dict=None, table_dict=None): self.summary_dict = summary_dict self.table_dict = table_dict self._group_box = None def group_box(self, title=""): self._group_box = QGroupBox(title) self._group_box.setObjectName("OUTERLEFT") _vbox = QVBoxLayout() _qspace = QFrame() _qspace.setFixedHeight(10) _vbox.addWidget(_qspace) _font = QFont("Sans Serif", 10) longest_str_item1 = "" longest_str_item2 = "" for i, (label, text) in enumerate(self.summary_dict.items()): if len(str(label)) > len(longest_str_item1): longest_str_item1 = str(label) if len(str(text)) > len(longest_str_item2): longest_str_item2 = str(text) fm = QFontMetricsF(_font) _factor = 1.15 if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"): _factor = 1.18 qrect1 = fm.boundingRect(longest_str_item1) qrect2 = fm.boundingRect(longest_str_item2) _width_scaling_factor = 1.5 _width_scaling_factor_le = 1.15 _widget_height = 25 for i, (label, text) in enumerate(self.summary_dict.items()): #print(label, text) qlabel = QLabel(label) qle = QLineEdit(text) qlabel.setFont(_font) qlabel.setStyleSheet(("QLabel{color:black;" + "margin:0px; padding:2px;}")) qlabel.setFixedWidth(int(qrect1.width() * _width_scaling_factor)) qlabel.setFixedHeight(_widget_height) qle.setFocusPolicy(Qt.NoFocus) qle.setFont(_font) qle.setStyleSheet(("QLineEdit{color:blue;" + "background-color: lightgray;" + "qproperty-readOnly: true;" + "margin:0px; padding:2px;}")) qle.setFixedWidth(int(qrect2.width() * _width_scaling_factor_le)) qle.setFixedHeight(_widget_height) qle.setAlignment(Qt.AlignRight) _hbox_widget = QWidget() _hbox = QHBoxLayout() _hbox.addWidget(qlabel) _hbox.addWidget(qle) _hbox_widget.setLayout(_hbox) _hbox.setAlignment(Qt.AlignCenter) _hbox.setContentsMargins(0, 2, 0, 0) _vbox.addWidget(_hbox_widget) _vbox.setContentsMargins(0, 0, 0, 0) _vbox.setAlignment(Qt.AlignCenter|Qt.AlignTop) _vbox2_widget = QWidget() _vbox2 = QVBoxLayout() _vbox2.setContentsMargins(0, 20, 0, 40) table = QTableWidget(len(self.table_dict)-1, 2) table.verticalHeader().setVisible(False) table.setFocusPolicy(Qt.NoFocus) #table.setFont(_font) longest_str_item1 = "" longest_str_item2 = "" for i, (label, text) in enumerate(self.table_dict.items()): item1 = QTableWidgetItem(str(label)) item2 = QTableWidgetItem(str(text)) item1.setTextAlignment(Qt.AlignCenter) item2.setTextAlignment(Qt.AlignCenter) item1.setForeground(QColor("black")) item2.setForeground(QColor("black")) if i%2 == 0: item1.setBackground(QColor("lightgray")) item2.setBackground(QColor("lightgray")) if len(str(label)) > len(longest_str_item1): longest_str_item1 = str(label) if len(str(text)) > len(longest_str_item2): longest_str_item2 = str(text) if i == 0: #item1.setFont(_font) #item2.setFont(_font) table.setHorizontalHeaderItem(0, item1) table.setHorizontalHeaderItem(1, item2) else: table.setItem(i-1, 0, item1) table.setItem(i-1, 1, item2) fm = QFontMetricsF(_font) _factor = 1.2 if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"): _factor = 1.18 qrect = fm.boundingRect(longest_str_item1 + longest_str_item2) _width_scaling_factor = 1.04 table.resizeColumnsToContents() table.resizeRowsToContents() table.setFixedHeight(int(fm.lineSpacing() * _factor * len( self.table_dict)) + fm.lineSpacing()*2) table.setFixedWidth(int(qrect.width() * _width_scaling_factor)) _vbox2.addWidget(table) _vbox2.setAlignment(Qt.AlignmentFlag(Qt.AlignCenter|Qt.AlignTop)) _vbox2_widget.setLayout(_vbox2) _vbox.addWidget(_vbox2_widget) self._group_box.setLayout(_vbox) self._group_box.setContentsMargins(20, 20, 20, 20) self._group_box.setAlignment(Qt.AlignTop) self._group_box.setFixedHeight(int( table.height() + (_widget_height*len(self.summary_dict)))) self._group_box.setFixedWidth(int(table.width() + 20)) return self._group_box class QResultsTableWidget(): """Results table""" def __init__(self, column_headings=None): self.column_headings = column_headings self._group_box = None def group_box(self, title="Table of Results"): self._group_box = QGroupBox(title) self._group_box.setObjectName("OUTER") _font = QFont("Sans Serif", 10) _vbox2_widget = QWidget() _vbox2 = QVBoxLayout() _vbox2.setContentsMargins(0, 20, 0, 40) table = QTableWidget(1, len(self.column_headings)) table.verticalHeader().setVisible(True) table.setFocusPolicy(Qt.NoFocus) table.setFont(_font) for i, heading in enumerate(self.column_headings): _item = QTableWidgetItem(str(heading)) table.setHorizontalHeaderItem(i, _item) table.resizeColumnsToContents() table.resizeRowsToContents() table.setFixedHeight(400) _vbox2.addWidget(table) _vbox2.setAlignment(Qt.AlignmentFlag(Qt.AlignCenter|Qt.AlignTop)) _vbox2_widget.setLayout(_vbox2) self._group_box.setLayout(_vbox2) self._group_box.setContentsMargins(20, 20, 20, 20) self._group_box.setAlignment(Qt.AlignTop) self._group_box.setFixedWidth(int(table.width() + 20)) return self._group_box class QHDFDockWidget(QDockWidget): def __init__(self, title=None, parent=None): super().__init__(title, parent) self.parent = parent self.is_docked = True self.geometry_from_qsettings = self.parent.application_geometry self.topLevelChanged.connect(self._top_level_changed) self.setVisible(False) self.setFloating(False) self.geometry_from_qsettings = self.parent.geometry() def closeEvent(self, event: QCloseEvent): super().closeEvent(event) self.parent.setGeometry(self.geometry_from_qsettings) self.setGeometry(self.geometry_from_qsettings) QApplication.processEvents() self.parent.setGeometry(self.geometry_from_qsettings) def changeEvent(self, event): pass def _top_level_changed(self, is_floating): pass class QNoDockWidget(QDockWidget): def __init__(self, title=None, parent=None): super().__init__(title, parent) self.parent = parent self.is_docked = True self.geometry_from_qsettings = self.parent.application_geometry self.topLevelChanged.connect(self._top_level_changed) self.setVisible(False) self.setFloating(True) x = self.geometry_from_qsettings.x() + 480 # 3500 #screen.width() - widget.width() y = self.geometry_from_qsettings.y() + 350 #100 #screen.height() - widget.height() self.move(x, y) def changeEvent(self, event): if "QAbstractButton" in str(self.sender()): self.geometry_from_qsettings = self.parent.geometry() def _top_level_changed(self): #, is_floating): self.setVisible(False) self.setFloating(True) #ResetGeometry self.parent.setGeometry(self.geometry_from_qsettings) QApplication.processEvents() class CAQStripChart(PlotWidget): '''Channel access enabled pyqtgraph.PlotWidget''' def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'], monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units: bool = False, prefix: str = "", suffix: str = "", notify_freq_hz: int = 0, title: str = "", ylabel: str = "", force_ts_align = True, text_label = [], pen_color_idx = 0): super().__init__() self.no_channels = len(pv_list) self.pen_color_idx = pen_color_idx self.text_label = text_label self.found = False self.time_zero = [0] * self.no_channels self.time_delta = [0] * self.no_channels self.pv_list = pv_list self.pv2item_dict = {} self.pv_gateway = [None] * self.no_channels self.pvd_previous_list = [None] * self.no_channels self.val_previous = [None] * self.no_channels self.curve = [None] * self.no_channels for i in range (0, len(self.pv_list)): self.pv_gateway[i] = PVGateway( parent, self.pv_list[i], monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, #connect_callback=self.py_connect_callback, connect_triggers=False, notify_freq_hz=notify_freq_hz, monitor_dbr_time = True) self.pv_gateway[i].is_initialize_complete() self.pvd_previous_list[i] = self.pv_gateway[i].pvd self.pv_gateway[i].trigger_connect.connect( self.receive_connect_update) self.pv_gateway[i].trigger_monitor_str.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor_int.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor_float.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor.connect( self.receive_monitor_dbr_time) self.pv_gateway[i].widget_class = "PlotWidget" self.pv2item_dict[self.pv_gateway[i]] = i self.cafe = self.pv_gateway[0].cafe self.cyca = self.pv_gateway[0].cyca for i in range(0, len(self.pv_gateway)): if self.cafe.isConnected(self.pv_gateway[i].pv_name): self.pv_gateway[i].trigger_connect.emit( self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name), self.pv_gateway[i].cyca.ICAFE_CS_CONN) for i in range(0, len(self.pv_gateway)): if not self.pv_gateway[i].pv_within_daq_group: self.pv_gateway[i].monitor_start() sampleinterval = 0.2 timewindow = 1800.0 self.ts_delta_max = 0.6 # Data stuff self._interval = int(sampleinterval*1000) self._bufsize = 10000 #int(timewindow/0.33) self._bufsize2 = 10000 # int(timewindow/1.33) self.databuffer = [None] * self.no_channels self.timebuffer = [None] * self.no_channels self.x = [None] * self.no_channels self.y = [None] * self.no_channels self.x_shifted = [None] * self.no_channels self.idx = [0] * self.no_channels for i in range(0, self.no_channels): bsize = self._bufsize if i == 0 else self._bufsize2 self.databuffer[i] = collections.deque([None]*bsize, bsize) self.timebuffer[i] = collections.deque([0]*bsize, bsize) self.x[i] = np.zeros(bsize, dtype=np.float) self.y[i] = np.zeros(bsize, dtype=np.float) _long_size=20 #self.data_series_buffer = collections.deque([0]*_long_size, _long_size) #self.time_series_buffer = collections.deque([0]*_long_size, _long_size) #self.data_series = [] * self.no_channels #self.time_series = [] * self.no_channels self.iflag_series = 0 #self.x = np.linspace(-timewindow, 0.0, self._bufsize) #self.x_series = np.zeros(_long_size, dtype=np.float) #self.y_series = np.zeros(_long_size, dtype=np.float) if title is not None: self.setTitle(str(title)) #self.pv_gateway[0].pv_name) self.showGrid(x=True, y=True) #self.setLabel('left', ylabel, self.pv_gateway[0].units) self.setLabel('bottom', 'time', 's') self.setBackground((60, 60, 60)) #247, 236, 249)) #self.setLimits(yMin=-0.11) #self.setLimits(yMin=-1, yMax=1) ax = pg.AxisItem('left') ax.enableAutoSIPrefix(enable=False) ax.setLabel(ylabel, self.pv_gateway[0].units) ax.setGrid(155) ay = pg.AxisItem('bottom') ay.enableAutoSIPrefix(enable=False) ay.setLabel('time', 'min') ay.setGrid(175) if 'BPM' in text_label: ax.setTickSpacing(0.2, 0.1) #ax.setRange(-1, 1) #ax.setParentItem(self.graphicsItem()) axitems = {} axitems['left'] = ax axitems['bottom'] = ay self.setAxisItems(axitems) pg.setConfigOption('leftButtonPan', False) self.plotItem.setMouseEnabled(y=True) # Only allow zoom in X-axis self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis #(125, 249, 255) if self.pen_color_idx == 0: pen_list = [ (255, 155, 0), (255,255,0), (0, 180, 255) ] elif self.pen_color_idx == 1: pen_list = [ (125, 249, 255), (255,255,0), (0, 180, 255) ] else: pen_list = [ (0, 180, 255), (125, 249, 255), (255,255,0) ] for i in range(0, len(self.pv_gateway)): self.curve[i] = self.plot(self.x[0], self.y[0], pen=pen_list[i]) # (0, 253, 235)) #self.curve[1] = self.plot(self.x[1], self.y[1], pen=(255,255,0)) #offset=(1.0, 1.0), l=pg.LegendItem() #horSpacing=20, verSpacing=0, labelTextColor=(205, 205, 205), #labelTextSize='6px', colCount=1) l.setParentItem(self.graphicsItem()) l.anchor((0,0), (0.08, 0.0)) #l.setLabelTextColor((205, 205, 205)) #l.setLabelTextSize(9) does not exists(!) #l.setOffset(-60) for curv, label in zip(self.curve, self.text_label): l.addItem(curv, label) #for curv, pv in zip(self.curve, self.pv_gateway): # l.addItem(curv, self.textpv.pv_name) #self.daq_stop() #print(self._bufsize) #print(len(self.x), len(self.y)) QApplication.processEvents() @Slot(object, int) def receive_monitor_dbr_time(self, pvdata, alarm_severity): #if not self.mutex.tryLock(): #locked(): # print("Event locked", pvdata.ts[0], pvdata.ts[1]) # return #self.mutex.lock() #acquire() _row = self.pv2item_dict[self.sender()] #print("row, value from pvdata==>", _row, pvdata.value[0], self.pv_gateway[_row].pv_name) ts_now = pvdata.ts[0] + pvdata.ts[1] * 10**(-9) ts_previous = (self.pvd_previous_list[_row].ts[0] + self.pvd_previous_list[_row].ts[1] * 10**(-9)) ts_delta = ts_now - ts_previous if (pvdata.ts[0] == self.pvd_previous_list[_row].ts[0]) and ( pvdata.ts[1] == self.pvd_previous_list[_row].ts[1]): #print("SAME TIMESTAMP") #pvdata.show() #self.pvd_previous_list[_row].show() #print("================") #self.mutex.unlock() #release() return if pvdata.ts[0] < self.pvd_previous_list[_row].ts[0]: print("Funny ts value", self.pv_gateway[_row].pv_name) pvdata.show() #self.mutex.unlock() #release() return value = pvdata.value[0] #discard first callbacks #if ts_delta > 2.0: # self.pvd_previous_list[_row] = _pvd # return; self.pvd_previous_list[_row] = pvdata self.val_previous[_row] = value #self.pvd_previous_list[_row].ts[0] = _pvd.ts[0] #self.pvd_previous_list[_row].ts[1] = _pvd.ts[1] self.databuffer[_row].append(value) self.timebuffer[_row].append(self.time_delta[_row]) highest_ts = self.timebuffer[0][0] \ if self.timebuffer[0][0] is not None else 0 for i in range(1, len(self.timebuffer)): if self.timebuffer[i][0] is None: continue elif self.timebuffer[i][0] > highest_ts: highest_ts = self.timebuffer[i][0] if self.timebuffer[_row][0] is not None: for i, val in enumerate(self.timebuffer[_row]): if val > highest_ts: self.idx[_row] = i - 1 break self.y[_row][:] = self.databuffer[_row] self.x[_row][:] = self.timebuffer[_row] idx = self.idx[_row] self.x_shifted[_row] = list(map(lambda m : (m - self.time_delta[_row]), self.x[_row][idx:])) #print("idx", self.idx) #if 'AVG' in self.pv_gateway[_row].pv_name: # for row in range(0, len(self.curve)): # self.curve[row].setData(self.x_shifted[row], self.y[row][idx:]) self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:]) self.time_delta[_row] = (( pvdata.ts[0] + pvdata.ts[1]*10**(-9)) - self.time_zero[0])/60 #self.mutex.unlock() #release() #QApplication.processEvents() @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): #self.pv_gateway.receive_monitor_update(value, status, alarm_severity) _row = self.pv2item_dict[self.sender()] #if _row == 1: # return print("row, value===>", _row, value, self.pv_gateway[_row].pv_name) _pvd = self.pv_gateway[_row].cafe.getPVCache( self.pv_gateway[_row].handle) #print("value", _pvd.value[0], self.pvd_previous_list[_row].value[0]) _pvd2 = self.pv_gateway[_row].pvd print ("val", value, _pvd2.value[0], _pvd.value[0], self.pvd_previous_list[_row].value[0]) ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9) ts_previous = (self.pvd_previous_list[_row].ts[0] + self.pvd_previous_list[_row].ts[1] * 10**(-9)) ts_delta = ts_now - ts_previous if value == self.val_previous[_row]: #if (_pvd.ts[0] == self.pvd_previous_list[_row].ts[0]) and ( # _pvd.ts[1] == self.pvd_previous_list[_row].ts[1]): _pvd.show() #self.pvd_previous_list[_row].show() return #discard first callbacks #if ts_delta > 2.0: # self.pvd_previous_list[_row] = _pvd # return; self.pvd_previous_list[_row] = _pvd2 self.val_previous[_row] = value #self.pvd_previous_list[_row].ts[0] = _pvd.ts[0] #self.pvd_previous_list[_row].ts[1] = _pvd.ts[1] self.databuffer[_row].append(value) self.timebuffer[_row].append(self.time_delta[_row]) highest_ts = self.timebuffer[0][0] \ if self.timebuffer[0][0] is not None else 0 for i in range(1, len(self.timebuffer)): if self.timebuffer[i][0] is None: continue elif self.timebuffer[i][0] > highest_ts: highest_ts = self.timebuffer[i][0] if self.timebuffer[_row][0] is not None: for i, val in enumerate(self.timebuffer[_row]): if val > highest_ts: self.idx[_row] = i - 1 break ''' for i in range(1, self.timebuffer): if self.timebuffer[i][0] is not None: a = self.timebuffer[0][0] for i, val in enumerate(self.timebuffer[_row]): if val > a: idx = i - 1 break ''' self.y[_row][:] = self.databuffer[_row] self.x[_row][:] = self.timebuffer[_row] #self.y[_row][:] = self.databuffer[_row] #self.x[_row][:] = self.timebuffer[_row] ''' #print(ts_delta, value, self.pvd_previous.value[0]) #if (ts_delta < self.ts_delta_max) and (value < self.pvd_previous.value[0]) : if (value < self.pvd_previous.value[0]) : self.data_series_buffer.append(value) self.time_series_buffer.append(ts_now - self.time_zero ) #self.time_delta) self.y_series[:] = self.data_series_buffer self.x_series[:] = self.time_series_buffer #print(self.x_series, self.y_series) #elif ts_delta < 1.0: if len(self.data_series_buffer) > 15: #x_series = np.array(self.time_series, dtype=np.float) #y_series = np.array(self.data_series, dtype=np.float) _x=self.x_series.reshape((-1, 1)) model = LinearRegression() model.fit(_x, self.y_series) r_sq = model.score(_x, self.y_series) ###JCprint('coefficient of determination:', r_sq, "slope", model.coef_ , "lifetime:", self.y_series[0]/model.coef_ / 3600) #print('intercept:', model.intercept_) #print('slope:', model.coef_) #print('max value', y_series[0], y_series[1]) if r_sq > 0.995: _I = self.y_series[0] ###JCprint("lifetime:", _I/model.coef_ / 3600) y_pred = model.predict(_x) #print("len, y_pred, _x", len(y_pred), len(self.y_series), len(_x)) #print('predicted response:', y_pred, sep='\n') m_sq_error = mean_squared_error(self.y_series, y_pred) #print('Mean squared error: {0:.9f}'.format( # mean_squared_error(y_series, y_pred))) #print('Coefficient of determination: {0:.9f}'.format( # r2_score(y_series, y_pred))) self.trigger_series_sequence.emit(self.x_series, self.y_series) #print("emit") self.data_series = [] self.time_series = [] #print(len(self.x_series), len(self.y_series)) else: self.data_series = [] self.time_series = [] ''' #dt = (self.x[-1] - self.x[-2]) #print("dt", dt) #Lowet IPCT before trigger is set to t=0 idx = self.idx[_row] self.x_shifted[_row] = list(map(lambda m : (m - self.time_delta[_row]), self.x[_row][idx:])) ##self.y = np.where(self.y != self.y, 0, self.y) #test for nan #print("row len len ", _row, self.time_delta[0], self.time_delta[1]) self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:]) self.time_delta[_row] = ( _pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero[0] ''' LOOK_BACK = -800 if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name: LOOK_BACK = -250 if value > self.y[-2]: if not self.found: #print(x_shifted[-240:], self.y[-240:]) #self.y = np.where(self.y != self.y, 0, self.y) #test for nan max_index = self.y[LOOK_BACK:].argmax() if max_index == 0: return print("max index=", max_index) #print(x_shifted[-600+max_index:], self.x[-600+max_index:]) #print(self.y[-600+max_index:-2]) self.found = True #print("Are Signals blocked??", self.signalsBlocked()) self.trigger_decay_sequence.emit(np.array( x_shifted[LOOK_BACK+max_index+9:-2]), self.y[LOOK_BACK+max_index+9:-2]) else: self.found = False ''' @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' print("pv_name==>", pv_name) _row = self.pv2item_dict[self.sender()] self.pv_gateway[_row].receive_connect_update(handle, pv_name, status, post_display=False) #self.pv_gateway.receive_connect_update(handle, pv_name, status) _pvd = self.pv_gateway[_row].cafe.getPVCache(self.pv_gateway[_row].handle) if self.time_zero[_row] == 0: self.time_zero[_row] = _pvd.ts[0] + _pvd.ts[1]*10**(-9) self.pvd_previous = _pvd #renove highlighting which persists after mouse leaves def mouseMoveEvent(self, event): #event.ignore() pass def leaveEvent(self, event): self.clearFocus() del event class CAQPCTChart(PlotWidget): '''Channel access enabled pyqtgraph.PlotWidget''' #trigger_monitor_float = Signal(float, int, int) #trigger_monitor_int = Signal(int, int, int) #trigger_monitor_str = Signal(str, int, int) #trigger_connect = Signal(int, str, int) trigger_decay_sequence = Signal(np.ndarray, np.ndarray) trigger_series_sequence = Signal(np.ndarray, np.ndarray) #def py_connect_callback(self, handle, pvname, status): # self.trigger_connect.emit(int(handle), str(pvname), int(status)) # print("py connect callback", handle, pvname, status) def daq_start(self): self.blockSignals(False) def daq_pause(self): self.blockSignals(True) def daq_stop(self): self.blockSignals(True) def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'], monitor_callback=None, pv_within_daq_group: bool = False, color_mode=None, show_units: bool = False, prefix: str = "", suffix: str = "", notify_freq_hz: int = 0): super().__init__() self.found = False self.time_zero = 0 self.time_delta = 0 self.pv_list = pv_list self.pv2item_dict = {} self.pv_gateway = [None] * len(self.pv_list) self.pvd_previous = None for i in range(0, len(self.pv_list)): self.pv_gateway[i] = PVGateway( parent, pv_list[i], monitor_callback, pv_within_daq_group, color_mode, show_units, prefix, suffix, #connect_callback=self.py_connect_callback, connect_triggers=False, notify_freq_hz=notify_freq_hz) self.pv_gateway[i].is_initialize_complete() self.pv_gateway[i].trigger_connect.connect( self.receive_connect_update) self.pv_gateway[i].trigger_monitor_str.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor_int.connect( self.receive_monitor_update) self.pv_gateway[i].trigger_monitor_float.connect( self.receive_monitor_update) self.pv_gateway[i].widget_class = "PlotWidget" self.pv2item_dict[self.pv_gateway[i]] = i self.cafe = self.pv_gateway[0].cafe self.cyca = self.pv_gateway[0].cyca for i in range(0, len(self.pv_gateway)): if self.cafe.isConnected(self.pv_gateway[i].pv_name): self.pv_gateway[i].trigger_connect.emit( self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name), self.pv_gateway[i].cyca.ICAFE_CS_CONN) for i in range(0, len(self.pv_gateway)): if not self.pv_gateway[i].pv_within_daq_group: self.pv_gateway[i].monitor_start() sampleinterval = 0.333 timewindow = 1800.0 self.ts_delta_max = 0.6 # Data stuff self._interval = int(sampleinterval*1000) self._bufsize = int(timewindow/sampleinterval) self.databuffer = collections.deque([None]*self._bufsize, self._bufsize) self.timebuffer = collections.deque([0]*self._bufsize, self._bufsize) _long_size = 20 self.data_series_buffer = collections.deque([0]*_long_size, _long_size) self.time_series_buffer = collections.deque([0]*_long_size, _long_size) self.data_series = [] self.time_series = [] self.iflag_series = 0 #self.x = np.linspace(-timewindow, 0.0, self._bufsize) self.x = np.zeros(self._bufsize, dtype=np.float) self.y = np.zeros(self._bufsize, dtype=np.float) self.x_series = np.zeros(_long_size, dtype=np.float) self.y_series = np.zeros(_long_size, dtype=np.float) self.setTitle("PCT(t)") #self.pv_gateway[0].pv_name) self.showGrid(x=True, y=True) self.setLabel('left', 'I', 'mA') self.setLabel('bottom', 'time', 's') self.setBackground((60, 60, 60)) #247, 236, 249)) self.setLimits(yMin=-0.11) self.plotItem.setMouseEnabled(y=False) # Only allow zoom in X-axis self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis self.curve = self.plot(self.x, self.y, pen=(125, 249, 255)) #self.curve2 = self.plot(self.x, self.y, pen=(255,255,0)) l = pg.LegendItem(offset=(0., 0.5)) l.setParentItem(self.graphicsItem()) l.setLabelTextColor((125, 249, 255)) l.addItem(self.curve, str(self.pv_gateway[0].pv_name)) ''' l2=self.addLegend() l2.setLabelTextColor('g') l2.setOffset(10) l2.addItem(self.curve2, str(self.pv_gateway[0].pv_name)) ''' self.daq_stop() print(self._bufsize) print(len(self.x), len(self.y)) @Slot(str, int, int) @Slot(int, int, int) @Slot(float, int, int) def receive_monitor_update(self, value, status, alarm_severity): #self.pv_gateway.receive_monitor_update(value, status, alarm_severity) _row = self.pv2item_dict[self.sender()] #print("value===>", value, self.pv_gateway[_row].pv_name) _pvd = self.pv_gateway[_row].cafe.getPVCache( self.pv_gateway[_row].handle) ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9) ts_previous = self.pvd_previous.ts[0] + self.pvd_previous.ts[1]*10**(-9) ts_delta = ts_now - ts_previous if (_pvd.ts[0] == self.pvd_previous.ts[0]) and ( _pvd.ts[1] == self.pvd_previous.ts[1]): #_pvd.show() return #discard first callbacks if ts_delta > 2.0: self.pvd_previous = _pvd return self.databuffer.append(value) self.y[:] = self.databuffer self.timebuffer.append(self.time_delta) self.x[:] = self.timebuffer #print(ts_delta, value, self.pvd_previous.value[0]) #if (ts_delta < self.ts_delta_max) and (value < # self.pvd_previous.value[0]): if value < self.pvd_previous.value[0]: self.data_series_buffer.append(value) self.time_series_buffer.append(ts_now - self.time_zero) self.y_series[:] = self.data_series_buffer self.x_series[:] = self.time_series_buffer #print(self.x_series, self.y_series) #elif ts_delta < 1.0: if len(self.data_series_buffer) > 15: #x_series = np.array(self.time_series, dtype=np.float) #y_series = np.array(self.data_series, dtype=np.float) _x = self.x_series.reshape((-1, 1)) model = LinearRegression() model.fit(_x, self.y_series) r_sq = model.score(_x, self.y_series) ###JCprint('coefficient of determination:', ###r_sq, "slope", model.coef_ , "lifetime:", ###self.y_series[0]/model.coef_ / 3600) #print('intercept:', model.intercept_) #print('slope:', model.coef_) #print('max value', y_series[0], y_series[1]) if r_sq > 0.995: #_I = self.y_series[0] ###JCprint("lifetime:", _I/model.coef_ / 3600) ####y_pred = model.predict(_x) #print("len, y_pred, _x", len(y_pred), len(self.y_series), # len(_x)) #print('predicted response:', y_pred, sep='\n') ##m_sq_error = mean_squared_error(self.y_series, y_pred) #print('Mean squared error: {0:.9f}'.format( # mean_squared_error(y_series, y_pred))) #print('Coefficient of determination: {0:.9f}'.format( # r2_score(y_series, y_pred))) self.trigger_series_sequence.emit(self.x_series, self.y_series) #print("emit") self.data_series = [] self.time_series = [] #print(len(self.x_series), len(self.y_series)) else: self.data_series = [] self.time_series = [] self.pvd_previous = _pvd #dt = (self.x[-1] - self.x[-2]) #print("dt", dt) #Lowet IPCT before trigger is set to t=0 x_shifted = list(map(lambda m: (m - self.time_delta), self.x)) ##self.y = np.where(self.y != self.y, 0, self.y) #test for nan self.curve.setData(x_shifted, self.y) self.time_delta = ( _pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero #x_shifted2= list(map(lambda m : m -self.time_delta-1 , self.x)) #self.curve2.setData(x_shifted2, self.y) #QApplication.processEvents() #print(type(x_shifted), type(self.y), type([1.1]), type(1.1)) LOOK_BACK = -800 if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name: LOOK_BACK = -250 if value > self.y[-2]: if not self.found: #print(x_shifted[-240:], self.y[-240:]) #self.y = np.where(self.y != self.y, 0, self.y) #test for nan max_index = self.y[LOOK_BACK:].argmax() if max_index == 0: return print("max index=", max_index) #print(x_shifted[-600+max_index:], self.x[-600+max_index:]) #print(self.y[-600+max_index:-2]) self.found = True #print("Are Signals blocked??", self.signalsBlocked()) self.trigger_decay_sequence.emit( np.array(x_shifted[LOOK_BACK+max_index+9:-2]), self.y[LOOK_BACK+max_index+9:-2]) else: self.found = False @Slot(int, str, int) def receive_connect_update(self, handle: int, pv_name: str, status: int): '''Triggered by connect signal''' print("pv_name==>", pv_name) _row = self.pv2item_dict[self.sender()] self.pv_gateway[_row].receive_connect_update(handle, pv_name, status, post_display=False) #self.pv_gateway.receive_connect_update(handle, pv_name, status) _pvd = self.pv_gateway[_row].cafe.getPVCache( self.pv_gateway[_row].handle) if self.time_zero == 0: self.time_zero = _pvd.ts[0] + _pvd.ts[1]*10**(-9) #print(self.time_zero) self.pvd_previous = _pvd #renove highlighting which persists after mouse leaves def mouseMoveEvent(self, event): pass def leaveEvent(self, event): self.clearFocus() del event