Files
caqtwidgets/pvwidgets.py

3221 lines
120 KiB
Python

''' Module with channel access enabled QtWidgets.'''
__author__ = 'Jan T. M. Chrin'
import collections
import numpy as np
import re
from threading import Lock
import time
from sklearn.linear_model import LinearRegression
from distutils.version import LooseVersion
from functools import reduce as func_reduce
from qtpy.QtCore import (QEventLoop, QMutex, QPoint, Qt, QThread, QTimer,
Signal, Slot)
from qtpy.QtGui import (QCloseEvent, QColor, QCursor, QFont, QFontMetricsF,
QIcon, QKeySequence)
from qtpy.QtCore import __version__ as QT_VERSION_STR
from qtpy.QtWidgets import (QAbstractItemView, QAbstractSpinBox, QAction,
QApplication, QBoxLayout, QCheckBox, QComboBox,
QDialog, QDockWidget, QDoubleSpinBox, QFrame,
QGroupBox, QHBoxLayout, QLabel, QLineEdit,
QListWidget, QMenu, QMessageBox, QPushButton,
QSpinBox, QStyle, QStyleOptionSpinBox, QTableWidget,
QTableWidgetItem, QVBoxLayout, QWidget)
import pyqtgraph as pg
from pyqtgraph import PlotWidget
from caqtwidgets.pvgateway import PVGateway
class QTaggedLineEdit(QWidget):
def __init__(self, label_text=str(""), value="",
position="LEFT", parent=None):
super(QTaggedLineEdit, self).__init__(parent)
self.parameter = str(value)
self.label = QLabel(label_text)
self.label.setObjectName("Tagged")
self.label.setFixedHeight(24)
self.label.setContentsMargins(10, 0, 0, 0)
#self.label.setFixedWidth(80)
self.line_edit = QLineEdit(self.parameter)
self.line_edit.setObjectName("Write")
self.line_edit.setFixedHeight(24)
font = QFont("sans serif", 16)
fm = QFontMetricsF(font)
self.line_edit.setMaximumWidth(fm.width(self.parameter)+20)
self.label.setBuddy(self.line_edit)
layout = QBoxLayout(
QBoxLayout.LeftToRight if position == "LEFT" else \
QBoxLayout.TopToBottom)
layout.addWidget(self.label)
layout.addWidget(self.line_edit)
layout.addStretch()
layout.setSpacing(2)
layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(layout)
class QHLine(QFrame):
def __init__(self):
super(QHLine, self).__init__()
self.setFrameShape(QFrame.HLine)
self.setFrameShadow(QFrame.Sunken)
class QVLine(QFrame):
def __init__(self):
super(QVLine, self).__init__()
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
class AppQLineEdit(QLineEdit):
def __init__(self, parent=None):
#super().__init__(parent)
pass
def leaveEvent(self, event):
self.clearFocus()
del event
class CAQLineEdit(QLineEdit, PVGateway):
'''Channel access enabled QLineEdit widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
trigger_daq = Signal(object, str, int)
trigger_daq_int = Signal(object, str, int)
trigger_daq_str = Signal(object, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = "",
notify_freq_hz: int = 0, precision: int = 0):
super().__init__(parent, pv_name, monitor_callback,
pv_within_daq_group, color_mode, show_units, prefix,
suffix, connect_callback=self.py_connect_callback,
notify_freq_hz=notify_freq_hz, precision=precision)
self.is_initialize_complete()
self.configure_widget()
if not self.pv_within_daq_group:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(object, str, int)
def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.NoFocus)
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight((fm.lineSpacing()*1.8))
self.setFixedWidth(((qrect.width()) * _width_scaling_factor))
if self.pv_within_daq_group:
self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA)
else:
self.qt_property_initial_values(qt_object_name=self.PV_READBACK)
#renove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
#event.ignore()
pass
def leaveEvent(self, event):
self.clearFocus()
del event
class CAQLabel(QLabel, PVGateway):
'''Channel access enabled QLabel widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
trigger_daq = Signal(object, str, int)
trigger_daq_int = Signal(object, str, int)
trigger_daq_str = Signal(object, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = "",
notify_freq_hz: int = 0, precision: int = 0):
super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
connect_callback=self.py_connect_callback,
notify_freq_hz=notify_freq_hz, precision=precision)
self.is_initialize_complete()
self.configure_widget()
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(object, str, int)
def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.NoFocus)
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight((fm.lineSpacing()*1.8))
self.setFixedWidth((qrect.width() * _width_scaling_factor))
if self.pv_within_daq_group:
self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA)
else:
self.qt_property_initial_values(qt_object_name=self.PV_READBACK)
#For use with CAQMenu
class QLineEditExtended(QLineEdit):
def __init__(self, parent=None):
super().__init__(parent)
self.parent = parent
def mousePressEvent(self, event):
button = event.button()
if button == Qt.RightButton:
self.parent.showContextMenu()
elif button == Qt.LeftButton:
self.parent.mousePressEvent(event)
class CAQMenu(QComboBox, PVGateway):
'''Channel access enabled QMenu widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, prefix: str = "", suffix: str = ""):
super().__init__(parent, pv_name, monitor_callback,
pv_within_daq_group, color_mode, show_units, prefix,
suffix, connect_callback=self.py_connect_callback)
self.is_initialize_complete()
self.configure_widget()
#After configure:widget
self.currentIndexChanged.connect(self.value_change)
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
def configure_widget(self):
self.previousIndex = None
self.setFocusPolicy(Qt.NoFocus)
self.setEditable(True)
self.setLineEdit(QLineEditExtended(self))
self.lineEdit().setReadOnly(True)
self.lineEdit().setAlignment(Qt.AlignCenter)
enumStringList = self.cafe.getEnumStrings(self.handle)
self.addItems(enumStringList)
for i in range(0, self.count()):
self.setItemData(i, Qt.AlignCenter, Qt.TextAlignmentRole)
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.1
self.setFixedHeight(fm.lineSpacing()*1.8)
self.setFixedWidth((qrect.width()+40) * _width_scaling_factor)
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
def post_display_value(self, value):
'''Convert value to index'''
if "setCurrentIndex" in dir(self):
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if isinstance(value, str):
self.setCurrentIndex(self.cafe.getEnumFromString(self.handle,
value))
elif isinstance(value, int):
self.setCurrentIndex(value)
#Should not happen
elif isinstance(value, float):
self.setCurrentIndex(int(value))
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
#self.previousIndex = self.currentIndex()
return
else:
print(("ERROR: overloaded post_display_value: 'setCurrentIndex' "
"method does not exist!"))
def value_change(self, indx):
status = self.cafe.set(self.handle, indx)
if status != self.cyca.ICAFE_NORMAL:
#self.showSetErrorMsg(status)
value = self.cafe.getCache(self.handle, 'int')
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if value is not None:
self.setCurrentIndex(value)
else:
if self.previousIndex is not None:
self.setCurrentIndex(self.previousIndex)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
self.pv_message_in_a_box.setText(
"CAQMenu set operation reports error:\n{0}".format(
self.cafe.getStatusCodeAsString(status)))
self.pv_message_in_a_box.exec()
def mousePressEvent(self, event):
button = event.button()
if button == Qt.RightButton:
PVGateway.mousePressEvent(self, event)
elif self.pv_info is not None:
if self.pv_info.accessWrite == 0:
event.ignore()
return
QComboBox.mousePressEvent(self, event)
self.previousIndex = self.currentIndex()
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
for i in range(0, self.count()):
self.setItemIcon(i, QIcon(":/forbidden.png"))
self.setStyleSheet(
("QComboBox {background: transparent}" +
"QComboBox::drop-down {image: url(:/forbidden.png)}"))
def leaveEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
for i in range(0, self.count()):
self.setItemIcon(i, QIcon())
self.setStyleSheet(
"QComboBox::drop-down {background: transparent}")
#The widget should not gain focus by using the mouse wheel.
#This is accomplished by setting the focus policy to Qt.StrongFocus.
#The widget should only accept wheel events if it already has the
#focus. This is accomplished by reimplementing QWidget.wheelEvent
#within a QSpinBox subclass:
def wheelEvent(self, event):
if self.hasFocus() is False:
event.ignore()
else:
QComboBox.wheelEvent(self, event)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
'''Triggered by monitor signal'''
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
class CAQMessageButton(QPushButton, PVGateway):
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
notify_freq_hz: int = 0,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, msg_label: str = "",
msg_press_value=None, msg_release_value=None,
start_monitor=False):
super().__init__(parent=parent, pv_name=pv_name,
monitor_callback=monitor_callback,
notify_freq_hz=notify_freq_hz,
pv_within_daq_group=pv_within_daq_group,
color_mode=color_mode, show_units=show_units,
msg_label=msg_label,
connect_callback=self.py_connect_callback)
self.msg_press_value = msg_press_value
self.msg_release_value = msg_release_value
if self.msg_press_value is not None:
self.pressed.connect(self.act_on_pressed)
if self.msg_release_value is not None:
self.released.connect(self.act_on_released)
self.msg_label = msg_label
self.suggested_text = self.msg_label
_suggested_text_length = len(self.suggested_text)+3
self.suggested_text = self.suggested_text.rjust(_suggested_text_length,
"^")
self.configure_widget()
self.msg_press_status = self.cyca.ICAFE_NORMAL
self.msg_release_status = self.cyca.ICAFE_NORMAL
self.msg_report_status = "PV={0}\n".format(self.pv_name)
self.msg_has_error = False
if not self.pv_within_daq_group and start_monitor:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.StrongFocus)
self.setCheckable(True) #Recognizes press and release states
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.0
self.setText(self.msg_label)
self.setFixedHeight((fm.lineSpacing()*1.8))
self.setFixedWidth((qrect.width() * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
def leaveEvent(self, event):
if self.property("readOnly"):
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
def mouseReleaseEvent(self, event):
if self.msg_release_value is not None:
time.sleep(0.1)
QPushButton.mouseReleaseEvent(self, event)
def mousePressEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 1:
QPushButton.mousePressEvent(self, event)
if event.button() == Qt.RightButton:
PVGateway.mousePressEvent(self, event)
def act_on_pressed(self):
if self.msg_press_value is not None:
self.msg_press_status = self.cafe.set(self.handle,
self.msg_press_value)
if self.msg_press_status != self.cyca.ICAFE_NORMAL:
self.msg_report_status += (
"Error in set operation (at press button):\n{0}\n".format(
self.cafe.getStatusCodeAsString(self.msg_press_status)))
self.msg_has_error = True
qm = QMessageBox()
qm.setText(self.msg_report_status)
qm.exec()
QApplication.processEvents()
def act_on_released(self):
if self.msg_release_value is not None:
self.msg_release_status = self.cafe.set(self.handle,
self.msg_release_value)
if self.msg_release_status != self.cyca.ICAFE_NORMAL:
self.msg_report_status += (
"Error in set operation (at release button):\n{0}\n".format(
self.cafe.getStatusCodeAsString(self.msg_release_status)))
self.msg_has_error = True
if self.msg_has_error:
self.msg_has_error = False
self.pv_message_in_a_box.setText(self.msg_report_status)
self.pv_message_in_a_box.exec()
self.msg_report_status = "PV={0}\n".format(self.pv_name)
qm = QMessageBox()
qm.setText(self.msg_report_status)
qm.exec()
QApplication.processEvents()
class CAQTextEntry(QLineEdit, PVGateway):
'''Channel access enabled QTextEntry widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, prefix: str = "", suffix: str = ""):
super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
connect_callback=self.py_connect_callback)
self.is_initialize_complete() #waits a fraction of a second
self.currentText = ""
self.returnPressed.connect(self.valuechange)
self.configure_widget()
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.StrongFocus)
fm = QFontMetricsF(QFont("Sans Serif", 12))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight((fm.lineSpacing()*1.8))
self.setFixedWidth(((qrect.width()+10) * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
def valuechange(self):
status = self.cafe.set(self.handle, self.text())
if status != self.cyca.ICAFE_NORMAL:
if self.cafe.getNoMonitors(self.handle) == 0:
val = self.cafe.get(self.handle, 'native')
else:
val = self.cafe.getCache(self.handle, 'native')
if val is not None:
if isinstance(val, str):
strText = val
else:
valStr = ("{: .%sf}" % self.precision)
strText = valStr.format(round(val, self.precision))
print(strText, " precision ", self.precision)
self.setText(strText)
else:
#Do this for TextInfo cache
if self.cafe.getNoMonitors(self.handle) == 0:
val = self.cafe.get(self.handle, 'native')
def setText(self, value):
QLineEdit.setText(self, value)
self.currentText = self.text()
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
self.setReadOnly(True)
self.setFocusPolicy(Qt.StrongFocus)
def leaveEvent(self, event):
if self.isReadOnly():
self.setReadOnly(False)
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
if self.text() != self.currentText:
QLineEdit.setText(self, self.currentText)
self.setCursorPosition(100)
self.clearFocus()
self.setFocusPolicy(Qt.NoFocus)
del event
def mousePressEvent(self, event):
if event.button() == Qt.RightButton:
PVGateway.mousePressEvent(self, event)
self.clearFocus()
return
local_event_position = QPoint(event.x(), event.y())
local_cursor_position = self.cursorPositionAt(local_event_position)
self.setCursorPosition(local_cursor_position)
class CAQSpinBox(QSpinBox, PVGateway):
'''Channel access enabled QTextEntry widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, prefix: str = "", suffix: str = ""):
super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
connect_callback=self.py_connect_callback)
self.is_initialize_complete()
self.valueChanged.connect(self.value_change)
self.configure_widget()
if not self.pv_within_daq_group:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''
Callback function to be invoked on change of pv connection
status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.previousValue = None
self.currentValue = None
self.setFocusPolicy(Qt.StrongFocus)
self.setButtonSymbols(QAbstractSpinBox.UpDownArrows)
self.setAccelerated(False)
self.setLineEdit(QLineEditExtended(self))
self.lineEdit().setEnabled(True)
self.lineEdit().setReadOnly(False)
self.lineEdit().setAlignment(Qt.AlignLeft)
self.lineEdit().setFont(QFont("Sans Serif", 16))
fm = QFontMetricsF(QFont("Sans Serif", 12))
_suggested_text = self.max_control_abs_str
_added_text = ""
if self.show_units:
_added_text += " " + self.units
_suggested_text += self.units
if self.suffix:
_added_text += " " + self.suffix
_suggested_text += self.suffix
self.setSuffix(_added_text)
qrect = fm.boundingRect(_suggested_text)
_width_scaling_factor = 1.0
self.setFixedHeight((fm.lineSpacing()*1.8))
self.setFixedWidth(((qrect.width()) * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
if self.pv_ctrl is not None:
self.setRange(int(self.pv_ctrl.lowerControlLimit),
int(self.pv_ctrl.upperControlLimit))
def post_display_value(self, value):
'''Convert value to index'''
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
self.setValue(int(round(value)))
self.blockSignals(False)
else:
self.setValue(int(round(value)))
def mousePressEvent(self, event):
_opt = QStyleOptionSpinBox()
self.initStyleOption(_opt)
_rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxUp, self)
_rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxDown, self)
self.previousValue = self.value()
if event.button() == Qt.LeftButton:
if _rect_up.contains(event.pos(), proper=True) or \
_rect_down.contains(event.pos(), proper=True):
if not self.cafe.isConnected(self.handle):
self.pv_message_in_a_box.setText(
("Spinbox change value events currently suspended\n" +
"as channel {0} is disconnected.").format(
self.pv_name))
self.pv_message_in_a_box.exec()
return
QSpinBox.mousePressEvent(self, event)
#Clear Focus: only one step per mouse click.
self.clearFocus()
local_event_position = QPoint(event.x(), event.y())
local_cursor_position = self.lineEdit().cursorPositionAt(
local_event_position)
self.lineEdit().setCursorPosition(local_cursor_position)
PVGateway.mousePressEvent(self, event)
def setValue(self, intVal):
QSpinBox.setValue(self, intVal)
self.currentValue = self.value()
def value_change(self, intVal):
status = self.cafe.set(self.handle, intVal)
if status != self.cyca.ICAFE_NORMAL:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'int')
if _value is not None:
self.setValue(_value)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
self.pv_message_in_a_box.setText(
("Spinbox set operation reports error:\n{0}"
.format(self.cafe.getStatusCodeAsString(status))))
self.pv_message_in_a_box.exec()
else:
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'int')
if _value is not None:
self.setValue(_value)
self.parent.statusbar.showMessage(
(self.widget_class + " " +
self.cafe.getStatusCodeAsString(status)))
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
self.setReadOnly(True)
self.setFocusPolicy(Qt.StrongFocus)
def leaveEvent(self, event):
if self.isReadOnly():
self.setReadOnly(False)
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
self.clearFocus()
self.setFocusPolicy(Qt.NoFocus)
del event
def keyPressEvent(self, event):
if event.key() in (Qt.Key_Return, Qt.Key_Enter):
QSpinBox.keyPressEvent(self, event)
self.clearFocus()
elif event.key() in (Qt.Key_Up, Qt.Key_Down):
QSpinBox.keyPressEvent(self, event)
else:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
QSpinBox.keyPressEvent(self, event)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
# The spin box should not gain focus by using the mouse wheel.
# This is accomplished by setting the focus policy to Qt.StrongFocus.
# The spin box should only accept wheel events if it already has the focus.
# This is accomplished by reimplementing QWidget.wheelEvent within a
# QSpinBox subclass:
def wheelEvent(self, event):
#print("wheelEvent", self.hasFocus())
if self.hasFocus() is False:
event.ignore()
else:
QSpinBox.wheelEvent(self, event)
class CAQDoubleSpinBox(QDoubleSpinBox, PVGateway):
'''Channel access enabled QDoubleSpinBox widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = ""):
super().__init__(parent=parent, pv_name=pv_name,
monitor_callback=monitor_callback,
pv_within_daq_group=pv_within_daq_group,
color_mode=color_mode, show_units=show_units,
prefix=prefix, suffix=suffix,
connect_callback=self.py_connect_callback)
self.is_initialize_complete()
self.valueChanged.connect(self.valuechange)
self.configure_widget()
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''
Callback function to be invoked on change of pv connection
status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.previousValue = None
self.currentValue = None
self.setFocusPolicy(Qt.StrongFocus)
self.setButtonSymbols(QAbstractSpinBox.UpDownArrows)
self.setAccelerated(False)
self.setLineEdit(QLineEditExtended(self))
self.lineEdit().setReadOnly(False)
self.lineEdit().setAlignment(Qt.AlignRight)
self.lineEdit().setFont(QFont("Sans Serif", 12))
_stepsize = 10**(self.precision * -1)
self.setSingleStep(_stepsize)
self.setDecimals(self.precision)
fm = QFontMetricsF(QFont("Sans Serif", 12))
_suggested_text = self.suggested_text
_added_text = ""
if self.show_units:
_added_text += " " + self.units
_suggested_text += self.units
if self.suffix:
_added_text += " " + self.suffix
_suggested_text += self.suffix
self.setSuffix(_added_text)
qrect = fm.boundingRect(_suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight((fm.lineSpacing()*1.8))
self.setFixedWidth(((qrect.width()) * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
if self.pv_ctrl is not None:
self.setRange(int(self.pv_ctrl.lowerControlLimit),
int(self.pv_ctrl.upperControlLimit))
def post_display_value(self, value):
'''set value from monitor'''
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
self.setValue(value)
self.blockSignals(False)
else:
self.setValue(value)
def mousePressEvent(self, event):
_opt = QStyleOptionSpinBox()
self.initStyleOption(_opt)
_rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxUp, self)
_rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxDown, self)
self.previousValue = self.value()
if event.button() == Qt.LeftButton:
if _rect_up.contains(event.pos(), proper=False) or \
_rect_down.contains(event.pos(), proper=False):
if not self.cafe.isConnected(self.handle):
self.pv_message_in_a_box.setText(
("Spinbox change value events currently suspended\n" +
"as channel {0} is disconnected.").format(
self.pv_name))
self.pv_message_in_a_box.exec()
return
QDoubleSpinBox.mousePressEvent(self, event)
local_event_position = QPoint(event.x(), event.y())
local_cursor_position = self.lineEdit().cursorPositionAt(
local_event_position)
self.lineEdit().setCursorPosition(local_cursor_position)
PVGateway.mousePressEvent(self, event)
def mouseReleaseEvent(self, event):
self.clearFocus()
def setValue(self, value):
self.currentValue = self.value()
QDoubleSpinBox.setValue(self, value)
def valuechange(self, fval):
status = self.cafe.set(self.handle, fval)
if status != self.cyca.ICAFE_NORMAL:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'float')
if _value is not None:
self.setValue(_value)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
self.pv_message_in_a_box.setText(
("Spinbox set operation reports error:\n{0}"
.format(self.cafe.getStatusCodeAsString(status))))
self.pv_message_in_a_box.exec()
else:
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'float')
if _value is not None:
self.setValue(_value)
self.parent.statusbar.showMessage(
(self.widget_class + " " +
self.cafe.getStatusCodeAsString(status)))
def enterEvent(self, event):
self.setFocusPolicy(Qt.StrongFocus)
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
self.setReadOnly(True)
def leaveEvent(self, event):
if self.isReadOnly():
self.setReadOnly(False)
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
self.clearFocus()
self.setFocusPolicy(Qt.NoFocus)
del event
def keyPressEvent(self, event):
if event.key() in (Qt.Key_Return, Qt.Key_Enter):
QDoubleSpinBox.keyPressEvent(self, event)
self.clearFocus()
elif event.key() in (Qt.Key_Up, Qt.Key_Down):
QDoubleSpinBox.keyPressEvent(self, event)
else:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
QDoubleSpinBox.keyPressEvent(self, event)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
# The spin box should not gain focus by using the mouse wheel.
# This is accomplished by setting the focus policy to Qt.StrongFocus.
# The spin box should only accept wheel events if it already has the focus.
# This is accomplished by reimplementing QWidget.wheelEvent within a
# QSpinBox subclass:
def wheelEvent(self, event):
if self.hasFocus() is False:
event.ignore()
else:
QDoubleSpinBox.wheelEvent(self, event)
class reconnectQPushButton(QPushButton, QThread):
def __init__(self, parent=None):
super().__init__()
self.parent = parent
self.clicked.connect(self.onClicked)
self.isdirty = False
self._handles_to_reconnect = []
self.reconnectThread = None
def onClicked(self, event):
self._handles_to_reconnect = []
for i in range(0, len(self.parent.pv_gateway)):
if self.parent.item(
i, self.parent.no_columns-1).checkState() == Qt.Checked:
self._handles_to_reconnect.append(
self.parent.pv_gateway[i].handle)
self.reconnect()
QApplication.processEvents()
def reconnect(self):
QApplication.processEvents()
self.isdirty = True
if self._handles_to_reconnect:
self.parent.cafe.reconnect(self._handles_to_reconnect)
self.isdirty = False
#Uncheck reconnected channels
for i in range(0, len(self.parent.pv_gateway)):
if self.parent.item(
i, self.parent.no_columns-1).checkState() == Qt.Checked:
if self.parent.cafe.isConnected(
self.parent.pv_gateway[i].handle):
self.parent.item(
i, self.parent.no_columns-1).setCheckState(False)
#Uncheck global reconnect check box
self.parent.cb_item_all.setCheckState(Qt.Unchecked)
class CAQTableWidget(QTableWidget):
'''Channel access enabled QTableWidget widget'''
#trigger_monitor_float = Signal(float, int, int)
#trigger_monitor_int = Signal(int, int, int)
#trigger_monitor_str = Signal(str, int, int)
#trigger_connect = Signal(int, str, int)
def hasNewData(self, _row, pv_data):
if self.pv_gateway[_row].pvd_previous is None:
return True
newDataFlag = False
if self.pv_gateway[_row].pvd_previous.ts[1] != pv_data.ts[1]:
newDataFlag = True
elif self.pv_gateway[_row].pvd_previous.ts[0] != pv_data.ts[0]:
newDataFlag = True
# Catch disconnect events(!!) and set newDataFlag only
elif self.pv_gateway[_row].pvd_previous.status != pv_data.status:
newDataFlag = True
return newDataFlag
def paint_rows(self, row_range: list = [], reset=False, last_row=[" ", " "],
columns=[0]):
_qcolor_last_line = QColor("#d1e8e9")
self.font_pts11 = QTableWidgetItem().font()
self.font_pts11.setPixelSize(11)
if reset:
_qcolor = self.item(0, self.columnCount()-1).background()
_start = 0
_end = self.rowCount()-1
else:
_qcolor = _qcolor_last_line
_start = row_range[0]
_end = row_range[1]
for _row in range(_start, _end):
_cell = QTableWidgetItem("{0}".format(_row+1))
if not reset:
_cell.setFont(self.font_pts11)
_cell.setBackground(_qcolor)
if 1 in columns:
self.item(_row, 0).setBackground(_qcolor)
self.item(_row, 0).setFont(self.font_pts11)
if 0 in columns:
self.setVerticalHeaderItem(_row, _cell)
#last row
if reset and 0 in columns:
_cell = QTableWidgetItem("{0}".format(last_row[0]))
_cell.setFont(self.font_pts11)
self.setVerticalHeaderItem(self.rowCount()-1, _cell)
self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter)
self.item(self.rowCount()-1, 0).setText(str(last_row[1]))
self.item(self.rowCount()-1, 0).setBackground(_qcolor)
self.item(self.rowCount()-1, 0).setFont(self.font_pts11)
elif last_row[0] != " ":
_cell = QTableWidgetItem("{0}".format(last_row[0]))
_cell.setBackground(_qcolor_last_line)
_cell.setFont(self.font_pts11)
self.setVerticalHeaderItem(self.rowCount()-1, _cell)
if columns:
self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter)
self.item(self.rowCount()-1, 0).setText(str(last_row[1]))
self.item(self.rowCount()-1, 0).setBackground(_qcolor_last_line)
self.item(self.rowCount()-1, 0).setFont(self.font_pts11)
def widget_update(self):
for _row, pvgate in enumerate(self.pv_gateway):
#for _row in range(0, len(self.pv_gateway)):
if not pvgate.notify_unison:
continue
_handle = pvgate.handle
_pvd = pvgate.cafe.getPVCache(_handle)
if _pvd.status in (self.cyca.ICAFE_CS_NEVER_CONN,
self.cyca.ICAFE_CA_OP_CONN_DOWN):
pvgate.pvd_previous = _pvd
continue
pvgate.pvd_previous = _pvd
#if timestamps the same - then skip
_value = _pvd.value[0]
_value = pvgate.format_display_value(_value)
qtwi = QTableWidgetItem(str(_value)+ " ")
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
self.setItem(_row, self.no_columns-3,
QTableWidgetItem(qtwi))
self.item(_row, self.no_columns-3).setTextAlignment(Qt.AlignRight |
Qt.AlignVCenter)
_ts_date = _pvd.tsDateAsString
_ts_str_len = len(_ts_date)
_ilength_target = self.format_ts_nano
while _ts_str_len < _ilength_target:
_ts_date += "0"
_ilength_target = _ilength_target - 1
_ts_str_len = len(_ts_date)
_ts_str = _ts_date[0: _ts_str_len - (self.format_ts_nano -
self.format_ts_decimal_part)]
_ts_str_len = len(_ts_str)
_ilength_target = self.format_ts_decimal_part
if self.format_ts_decimal_part == self.format_ts_deci:
if _ts_str_len == self.format_ts_sec:
_ts_str += "."
while _ts_str_len < _ilength_target:
_ts_str += "0"
_ilength_target = _ilength_target -1
qtwi = QTableWidgetItem(_ts_str)
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
self.setItem(_row, self.no_columns-2, QTableWidgetItem(qtwi))
self.item(_row, self.no_columns-2).setTextAlignment(Qt.AlignCenter)
_prop = pvgate.qt_dynamic_property_get()
alarm_severity = _pvd.alarmSeverity
if _prop == pvgate.READBACK_ALARM:
if alarm_severity == pvgate.cyca.SEV_MAJOR:
_bgcolor = pvgate.fg_alarm_major
_fgcolor = "black"
elif alarm_severity == pvgate.cyca.SEV_MINOR:
_bgcolor = pvgate.fg_alarm_minor
_fgcolor = "black"
elif alarm_severity == pvgate.cyca.SEV_INVALID:
_bgcolor = pvgate.fg_alarm_invalid
_fgcolor = "#777777"
else:
_bgcolor = pvgate.fg_alarm_noalarm
_fgcolor = "black"
#Colors for bg/fg reversed as is the old norm
self.item(_row, self.no_columns-3).setBackground(
QColor(_bgcolor))
self.item(_row, self.no_columns-2).setBackground(
QColor(_bgcolor))
self.item(_row, self.no_columns-3).setForeground(
QColor(_fgcolor))
self.item(_row, self.no_columns-2).setForeground(
QColor(_fgcolor))
elif _prop == pvgate.READBACK_STATIC:
self.item(_row, self.no_columns-3).setBackground(
QColor(pvgate.bg_readback))
self.item(_row, self.no_columns-2).setBackground(
QColor(pvgate.bg_readback))
elif _prop == pvgate.DISCONNECTED:
self.item(_row, self.no_columns-3).setBackground(
QColor("#ffffff"))
self.item(_row, self.no_columns-2).setBackground(
QColor("#ffffff"))
self.item(_row, self.no_columns-3).setForeground(
QColor("#777777"))
self.item(_row, self.no_columns-2).setForeground(
QColor("#777777"))
else:
print(_prop, "widget_update unknown in element/row", _row,
_row+1)
QApplication.processEvents()
def __init__(self, parent=None, pv_list: list = ["PV_NAME_NOT_GIVEN"],
monitor_callback=None, pv_within_daq_group: bool = False,
color_mode=None, show_units: bool = True, prefix: str = "",
suffix: str = "", ts_res: str = "milli",
init_column: bool = False, init_list: list = [],
notify_freq_hz: int = 0, notify_unison: bool = True,
precision: int = 0, scale_factor: float = 1,
show_timestamp: bool = True, pv_list_show: list = None):
super().__init__()
self.columns_dict = {}
_column_dict_value = 0
if pv_list_show is not None:
if pv_list_show[0]:
self.columns_dict['PV'] = _column_dict_value
_column_dict_value += 1
else:
self.columns_dict['PV'] = _column_dict_value
_column_dict_value += 1
if init_column:
self.columns_dict['Init'] = _column_dict_value
_column_dict_value += 1
self.columns_dict['Value'] = _column_dict_value
if show_timestamp:
_column_dict_value += 1
self.columns_dict['Timestamp'] = _column_dict_value
_column_dict_value += 1
self.columns_dict['Reconnect'] = _column_dict_value
self.setWindowModality(Qt.ApplicationModal)
self.no_columns = _column_dict_value + 1
self.init_column = init_column
self.init_list = init_list
if self.init_column and not self.init_list:
self.init_list = pv_list
self.icount = 0
self.notify_freq_hz = abs(notify_freq_hz)
self.notify_freq_hz_default = self.notify_freq_hz
self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \
1000 / self.notify_freq_hz
self.notify_unison = bool(notify_unison) and bool(self.notify_freq_hz)
self.precision = precision
self.scale_factor = scale_factor
self.show_timestamp = show_timestamp
self.format_ts_nano = 31 #max length of date
self.format_ts_micro = 28
self.format_ts_milli = 25
self.format_ts_deci = 23 #-8
self.format_ts_sec = 21
if "nano" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_nano
elif "micro" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_micro
elif "milli" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_milli
elif "deci" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_deci
elif "sec" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_sec
else:
self.format_ts_decimal_part = self.format_ts_milli
self.pv2item_dict = {}
self.pv_list = pv_list
self.pv_gateway = [None] * len(self.pv_list)
self.pv_list_show = pv_list_show
if self.pv_list_show is None:
self.pv_list_show = self.pv_list
_color_mode = [None] * len(self.pv_list)
if color_mode is not None:
if isinstance(color_mode, list):
for i in range(0, len(color_mode)):
_color_mode[i] = color_mode[i]
else:
for i in range(0, len(_color_mode)):
_color_mode[i] = color_mode
for i in range(0, len(self.pv_list)):
self.pv_gateway[i] = PVGateway(
parent, self.pv_list[i], monitor_callback,
pv_within_daq_group, _color_mode[i], show_units, prefix, suffix,
connect_triggers=False, notify_freq_hz=self.notify_freq_hz,
notify_unison=self.notify_unison, precision=self.precision)
self.pv_gateway[i].is_initialize_complete()
self.pv_gateway[i].trigger_connect.connect(
self.receive_connect_update)
self.pv_gateway[i].trigger_monitor_str.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_int.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_float.connect(
self.receive_monitor_update)
self.pv_gateway[i].widget_class = "QTableWidgetItem"
self.pv_gateway[i].qt_property_initial_values(
qt_object_name=self.pv_gateway[i].PV_READBACK, tool_tip=False)
#required for reconnect
self.cafe = self.pv_gateway[0].cafe
self.cyca = self.pv_gateway[0].cyca
self.timer = None
if self.notify_unison:
self.timer = QTimer()
self.timer.timeout.connect(self.widget_update)
self.timer.singleShot(0, self.widget_update)
self.timer.start(self.notify_milliseconds)
self.configure_widget()
#Connect only deals with colours - only helps on reconnect
# In any case monitors take over
#Got to do this earlier or emit immediately after!!
for i in range(0, len(self.pv_gateway)):
if self.cafe.isConnected(self.pv_gateway[i].pv_name):
self.pv_gateway[i].trigger_connect.emit(
self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name),
self.pv_gateway[i].cyca.ICAFE_CS_CONN)
for i in range(0, len(self.pv_gateway)):
if not self.pv_gateway[i].pv_within_daq_group:
self.pv_gateway[i].monitor_start()
if init_column:
self.update_init_values()
self.configure_context_menu()
def configure_context_menu(self):
self.table_context_menu = QMenu()
self.table_context_menu.setObjectName("contextMenu")
self.table_context_menu.setWindowModality(Qt.NonModal)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.table_context_menu.addSection("---")
action1 = QAction("Configure Table PVs", self)
action1.triggered.connect(self.display_table_parameters)
self.table_context_menu.addAction(action1)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.table_context_menu.addSection("---")
QApplication.processEvents()
def restore_init_values(self, pv_list: list = []):
_set_values_dict = self.get_init_values()
if not pv_list:
_pvs_to_set, _values_to_set = zip(*_set_values_dict.items())
#zip returns tuples
_pvs_to_set = list(_pvs_to_set)
_values_to_set = list(_values_to_set)
else:
_pvs_to_set = []
_values_to_set = []
for pv in pv_list:
if pv in _set_values_dict.keys():
_pvs_to_set.append(pv)
_values_to_set.append(_set_values_dict[pv])
status, status_list = self.cafe.setScalarList(_pvs_to_set,
_values_to_set)
if status != self.cyca.ICAFE_NORMAL:
_mess = ("The following device(s) reported an error " +
"in 'set' operation:")
for i, status_value in enumerate(status_list):
if status_value != self.cyca.ICAFE_NORMAL:
_mess += ("\n" + _pvs_to_set[i] + " has status = " +
str(status_value) + " " +
self.cafe.getStatusCodeAsString(status_value) +
" " + self.cafe.getStatusInfo(status_value))
qm = QMessageBox()
qm.setText(_mess)
qm.exec()
QApplication.processEvents()
self.init_value_button.setEnabled(True)
def is_same_as_init_values(self):
_init_values_dict = self.get_column_values(self.columns_dict['Init'])
_pvs, _init_values = zip(*_init_values_dict.items())
_current_values_dict = self.get_column_values(
self.columns_dict['Value'])
_pvs, _current_values = zip(*_current_values_dict.items())
#zip returns tuples
return bool(func_reduce(lambda i, j: i and j, map(
lambda m, k: m == k, _init_values, _current_values), True))
#if func_reduce(lambda i, j: i and j, map(
# lambda m, k: m == k, _init_values, _current_values), True):
# return True
#else:
# return False
def get_column_values(self, column_no):
_values_dict = {}
_start = 0
_end = len(self.pv_gateway)
_pvs = [None] * _end
_values_str = [None] * _end
_values = [None] * _end
for _row in range(_start, _end):
_values_str[_row] = self.item(_row, column_no).text()
_pvs[_row] = self.item(_row, 0).text()
_value_list = [float(_value_list) for _value_list in re.findall(
r'-?\d+\.?\d*', _values_str[_row])]
if not _value_list:
print("row", _row, "values", _values_str[_row], _pvs[_row])
_values[_row] = _values_str[_row] #Can be enum string
else:
_values[_row] = _value_list[0]
if _pvs[_row] in self.pv_list_show:
_values_dict[self.pv_gateway[_row].pv_name] = _values[_row]
return _values_dict #_pvs_to_set, _values_to_set
def get_init_values(self):
return self.get_column_values(self.columns_dict['Init'])
def get_init_values_previous(self):
_set_values_dict = {}
_start = 0
_end = len(self.pv_gateway)
_pvs_to_set = [None] * _end
_values_to_set_str = [None] * _end
_values_to_set = [None] * _end
for _row in range(_start, _end):
_values_to_set_str[_row] = self.item(
_row, self.columns_dict['Init']).text()
_pvs_to_set[_row] = self.item(_row, self.columns_dict['PV']).text()
_value_list = [float(_value_list) for _value_list in re.findall(
r'-?\d+\.?\d*', _values_to_set_str[_row])]
if not _value_list:
print("//row", _row, "values", _values_to_set_str[_row],
_pvs_to_set[_row])
_values_to_set[_row] = _values_to_set_str[_row] #Can be enum str
else:
_values_to_set[_row] = _value_list[0]
if _pvs_to_set[_row] in self.init_list:
_set_values_dict[
self.pv_gateway[_row].pv_name] = _values_to_set[_row]
return _set_values_dict
def update_init_values(self):
_start = 0
_end = len(self.pv_gateway)
if 'Init' in self.columns_dict:
_column_no = self.columns_dict['Init']
else:
return
for _row in range(_start, _end):
_handle = self.pv_gateway[_row].handle
_value = self.pv_gateway[_row].cafe.getCache(_handle)
if _value is not None:
if self.scale_factor != 1:
_value = _value * self.scale_factor
_value = self.pv_gateway[_row].format_display_value(_value)
qtwi = QTableWidgetItem(str(_value)+ " ")
_f = qtwi.font()
_f.setPointSize(8)
qtwi.setFont(_f)
self.setItem(_row, 1, qtwi)
self.item(_row, _column_no).setTextAlignment(Qt.AlignRight |
Qt.AlignVCenter)
def configure_widget(self):
_column_width_pvname = 180
_column_width_value = 90
_column_width_timestamp = 210
_column_width_checkbox = 22
self.setRowCount(len(self.pv_gateway)+1)
self.setColumnCount(self.no_columns)
self.setEditTriggers(QAbstractItemView.NoEditTriggers)
self.resizeColumnsToContents()
self.resizeRowsToContents()
#self.horizontalHeader().setStretchLastSection(True);
if 'PV' in self.columns_dict.keys():
self.setColumnWidth(self.columns_dict['PV'], _column_width_pvname)
_pv_column = self.columns_dict['PV']
self.setColumnWidth(self.columns_dict['Value'], _column_width_value)
if 'Init' in self.columns_dict.keys():
self.setColumnWidth(self.columns_dict['Init'], _column_width_value)
if 'Timestamp' in self.columns_dict.keys():
self.setColumnWidth(self.columns_dict['Timestamp'],
_column_width_timestamp)
self.setColumnWidth(self.columns_dict['Reconnect'],
_column_width_checkbox)
for i in range(0, len(self.pv_gateway)):
istart = 1
if 'PV' in self.columns_dict.keys():
qtwt = QTableWidgetItem(self.pv_list_show[i])
f = qtwt.font()
f.setPointSize(8)
qtwt.setFont(f)
self.setItem(i, _pv_column, qtwt)
self.item(i, _pv_column).setTextAlignment(Qt.AlignHCenter |
Qt.AlignVCenter)
else:
istart = 0
for i_column in range(istart, self.no_columns-1):
self.setItem(i, i_column, QTableWidgetItem(str("")))
self.item(i, i_column).setTextAlignment(Qt.AlignHCenter |
Qt.AlignVCenter)
self.pv2item_dict[self.pv_gateway[i]] = i
cb_item = QTableWidgetItem()
cb_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
cb_item.setCheckState(Qt.Unchecked)
cb_item.setTextAlignment(Qt.AlignCenter)
cb_item.setToolTip(self.pv_gateway[i].pv_name)
self.setItem(i, self.no_columns-1, cb_item)
self.item(i, self.no_columns-1).setTextAlignment(Qt.AlignCenter)
if self.init_column:
self.init_widget = QWidget()
_init_layout = QHBoxLayout(self.init_widget)
self.init_value_button = QPushButton()
self.init_value_button.setText("Update")
_f = self.init_value_button.font()
_f.setPointSize(8)
self.init_value_button.setFont(_f)
self.init_value_button.setFixedWidth(64)
self.init_value_button.clicked.connect(self.update_init_values)
self.init_value_button.setToolTip(
("Stores initial, pre-measurement value. Update is also " +
"typically executed automatically before analysis procedure."))
_init_layout.addWidget(self.init_value_button)
_init_layout.setAlignment(Qt.AlignCenter)
_init_layout.setContentsMargins(1, 1, 1, 0) #Required
self.init_widget.setLayout(_init_layout)
self.setCellWidget(len(self.pv_gateway), 1, self.init_widget)
_restore_widget = QWidget()
_restore_layout = QHBoxLayout(_restore_widget)
self.restore_value_button = QPushButton()
self.restore_value_button.setStyleSheet(
"QPushButton{background-color: rgb(212, 219, 157);}")
self.restore_value_button.setText("Restore")
_f = self.restore_value_button.font()
_f.setPointSize(8)
self.restore_value_button.setFont(_f)
self.restore_value_button.setFixedWidth(80)
self.restore_value_button.clicked.connect(self.restore_init_values)
self.restore_value_button.setToolTip(
("Restore devices to their pre-measurement values"))
_restore_layout.addWidget(self.restore_value_button)
_restore_layout.setAlignment(Qt.AlignCenter)
_restore_layout.setContentsMargins(1, 1, 0, 0)
_restore_widget.setLayout(_restore_layout)
self.setCellWidget(len(self.pv_gateway), 0, _restore_widget)
#Do not display no for last row (Reconnect button)
_row_digit_last_cell = QTableWidgetItem(str(""))
self.setVerticalHeaderItem(len(self.pv_gateway), _row_digit_last_cell)
self.setItem(len(self.pv_gateway), 0, QTableWidgetItem(str("")))
_qwb = QWidget()
self.reconnect_button = reconnectQPushButton(self) #self required
f = self.reconnect_button.font()
if 'Timestamp' in self.columns_dict.keys():
f.setPointSize(8)
self.reconnect_button.setFixedWidth(100)
else:
f.setPointSize(7) #6
self.reconnect_button.setFixedWidth(66) #58
self.reconnect_button.setFont(f)
self.reconnect_button.setText("Reconnect")
_layout = QHBoxLayout(_qwb)
_layout.addWidget(self.reconnect_button)
_layout.setAlignment(Qt.AlignCenter)
_layout.setContentsMargins(0, 0, 0, 0) #Required
#_reconnect_button
self.setCellWidget(len(self.pv_gateway), self.no_columns-2, _qwb)
self.cb_item_all = QCheckBox()
self.cb_item_all.setCheckState(Qt.Unchecked)
self.cb_item_all.stateChanged.connect(self.reconnectStateChanged)
self.cb_item_all.setObjectName("Reconnect")
self.setCellWidget(len(self.pv_gateway), self.no_columns-1,
self.cb_item_all)
if 'PV' in self.columns_dict.keys():
header_item = QTableWidgetItem("Process Variable")
self.setHorizontalHeaderItem(self.columns_dict['PV'], header_item)
if 'Init' in self.columns_dict.keys():
self.setHorizontalHeaderItem(self.columns_dict['Init'],
QTableWidgetItem("Initial Value"))
self.setHorizontalHeaderItem(self.columns_dict['Value'],
QTableWidgetItem("Value"))
if 'Timestamp' in self.columns_dict.keys():
self.setHorizontalHeaderItem(self.columns_dict['Timestamp'],
QTableWidgetItem("Timestamp"))
self.setHorizontalHeaderItem(self.columns_dict['Reconnect'],
QTableWidgetItem("R"))
self.setFocusPolicy(Qt.NoFocus)
self.setEditTriggers(QAbstractItemView.NoEditTriggers)
self.setSelectionMode(QAbstractItemView.NoSelection)
self.verticalHeader().setDefaultAlignment(Qt.AlignRight)
self.verticalHeader().setFixedWidth(22)
_fm_font = QFont("Sans Serif")
_fm_font.setPointSize(12)
fm = QFontMetricsF(_fm_font)
_factor = 1
if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"):
_factor = 1.18
self.setFixedHeight(
int(fm.lineSpacing() * _factor * (len(self.pv_gateway)+3)))
_min_table_width = 620 if not self.init_column else 650
self.setMinimumWidth(_min_table_width)
for _row in range(0, len(self.pv_gateway)):
if 'PV' in self.columns_dict.keys():
self.item(_row, _pv_column).setForeground(QColor("#000000"))
istart = 1
else:
istart = 0
for i_column in range(istart, self.no_columns-2):
self.item(_row, i_column).setForeground(QColor("#000000"))
self.item(_row, i_column).setTextAlignment(Qt.AlignRight |
Qt.AlignVCenter)
self.item(_row, self.columns_dict['Value']).setBackground(
QColor("#ffffff"))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row,
self.columns_dict['Timestamp']).setTextAlignment(
Qt.AlignCenter)
self.item(_row,
self.columns_dict['Timestamp']).setBackground(
QColor("#ffffff"))
@Slot(int)
def reconnectStateChanged(self, state):
if state == Qt.Unchecked:
for i in range(0, len(self.pv_gateway)):
self.item(i, self.columns_dict['Reconnect']).setCheckState(
Qt.Unchecked)
else:
for i in range(0, len(self.pv_gateway)):
self.item(i, self.columns_dict['Reconnect']).setCheckState(
Qt.Checked)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].time_monotonic = time.monotonic()
if self.scale_factor != 1:
value = value * self.scale_factor
_value = self.pv_gateway[_row].format_display_value(value)
qtwi = QTableWidgetItem(str(_value) + " ")
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
self.setItem(_row, self.columns_dict['Value'], qtwi)
self.item(_row, self.columns_dict['Value']).setTextAlignment(
Qt.AlignRight | Qt.AlignVCenter)
if 'Timestamp' in self.columns_dict.keys():
_handle = self.pv_gateway[_row].handle
_pvd = self.pv_gateway[_row].cafe.getPVCache(_handle)
_ts_date = _pvd.tsDateAsString
_ts_str_len = len(_ts_date)
_ilength_target = self.format_ts_nano
while _ts_str_len < _ilength_target:
_ts_date += "0"
_ilength_target = _ilength_target -1
##ts_str_len = len(_ts_date)
_ts_str = _ts_date[0: _ts_str_len-(
self.format_ts_nano-self.format_ts_decimal_part)]
_ts_str_len = len(_ts_str)
_ilength_target = self.format_ts_decimal_part
if self.format_ts_decimal_part == self.format_ts_deci:
if _ts_str_len == self.format_ts_sec:
_ts_str += "."
while _ts_str_len < _ilength_target:
_ts_str += "0"
_ilength_target = _ilength_target -1
qtwi = QTableWidgetItem(_ts_str)
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
self.setItem(_row, self.columns_dict['Timestamp'], qtwi)
self.item(_row, self.columns_dict['Timestamp']).setTextAlignment(
Qt.AlignCenter)
_prop = self.pv_gateway[_row].qt_dynamic_property_get()
if _prop == self.pv_gateway[_row].READBACK_ALARM:
if alarm_severity == self.pv_gateway[_row].cyca.SEV_MAJOR:
_bgcolor = self.pv_gateway[_row].fg_alarm_major
_fgcolor = "black"
elif alarm_severity == self.pv_gateway[_row].cyca.SEV_MINOR:
_bgcolor = self.pv_gateway[_row].fg_alarm_minor
_fgcolor = "black"
elif alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID:
_bgcolor = self.pv_gateway[_row].fg_alarm_invalid
_fgcolor = "#777777"
else:
_bgcolor = self.pv_gateway[_row].fg_alarm_noalarm
_fgcolor = "black"
#Colors for bg/fg reversed as is the old norm
self.item(_row, self.columns_dict['Value']).setBackground(
QColor(_bgcolor))
self.item(_row, self.columns_dict['Value']).setForeground(
QColor(_fgcolor))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor(_bgcolor))
self.item(_row, self.columns_dict['Timestamp']).setForeground(
QColor(_fgcolor))
elif _prop == self.pv_gateway[_row].DISCONNECTED or \
alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID:
self.item(_row, self.columns_dict['Value']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Value']).setForeground(
QColor("#777777"))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Timestamp']).setForeground(
QColor("#777777"))
elif _prop == self.pv_gateway[_row].READBACK_STATIC:
self.item(_row, self.columns_dict['Value']).setBackground(
QColor(self.pv_gateway[_row].bg_readback))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor(self.pv_gateway[_row].bg_readback))
else:
print(_prop, self.pv_gateway[_row].DISCONNECTED,
"(in monitor) unknown in element/row no.", _row, _row+1)
QApplication.processEvents(QEventLoop.AllEvents, 10)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].receive_connect_update(handle, pv_name, status,
post_display=False)
_prop = self.pv_gateway[_row].qt_dynamic_property_get()
#self.post_display_value(status)
if _prop == self.pv_gateway[_row].DISCONNECTED:
self.item(_row, self.columns_dict['Value']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Value']).setForeground(
QColor("#777777"))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Timestamp']).setForeground(
QColor("#777777"))
QApplication.processEvents()
def table_precision_user_changed(self, new_value):
self.pvgateway_precision = new_value
for pvgate in self.pv_gateway:
if pvgate.pv_ctrl is not None:
self.pvgateway_precision = min(pvgate.pv_ctrl.precision,
new_value)
pvgate.precision_user = self.pvgateway_precision
pvgate.precision = self.pvgateway_precision
_pvd = self.cafe.getPVCache(pvgate.handle)
if _pvd.value[0] is not None:
if isinstance(_pvd.value[0], float):
pvgate.trigger_monitor_float.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
def table_precision_ioc_reset(self):
if self.max_precision_value == self.table_precision_user_wgt.value():
self.table_precision_user_changed(self.max_precision_value)
else:
self.table_precision_user_wgt.setValue(self.max_precision_value)
def table_refresh_rate_changed(self, new_idx):
_notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx]
_notify_milliseconds = 0 if _notify_freq_hz == 0 else \
1000 / _notify_freq_hz
self.notify_freq_hz = _notify_freq_hz
if _notify_milliseconds == 0:
for pvgate in self.pv_gateway:
pvgate.notify_unison = False
pvgate.notify_milliseconds = _notify_milliseconds
pvgate.notify_freq_hz = self.notify_freq_hz
pvgate.monitor_stop()
time.sleep(0.01)
for pvgate in self.pv_gateway:
pvgate.monitor_start()
else:
for pvgate in self.pv_gateway:
if not pvgate.notify_unison:
pvgate.monitor_stop()
for pvgate in self.pv_gateway:
pvgate.notify_milliseconds = _notify_milliseconds
pvgate.notify_freq_hz = self.notify_freq_hz
if not pvgate.notify_unison:
pvgate.notify_unison = True
pvgate.monitor_start()
else:
self.cafe.updateMonitorPolicyDeltaMS(
pvgate.handle, pvgate.monitor_id,
pvgate.notify_milliseconds)
if self.timer is not None:
self.timer.stop()
else:
self.timer = QTimer()
self.timer.timeout.connect(self.widget_update)
self.timer.singleShot(0, self.widget_update)
if _notify_milliseconds > 0:
self.timer.start(_notify_milliseconds)
def table_ts_resolution_changed(self, new_idx):
for i, ts_res in enumerate(self.ts_combox_idx_dict.values()):
if i == new_idx:
self.format_ts_decimal_part = ts_res
break
for pvgate in self.pv_gateway:
_pvd = self.cafe.getPVCache(pvgate.handle)
if _pvd.value[0] is not None:
if isinstance(_pvd.value[0], float):
pvgate.trigger_monitor_float.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
elif isinstance(_pvd.value[0], int):
pvgate.trigger_monitor_int.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
else:
pvgate.trigger_monitor_str.emit(
str(_pvd.value[0]), _pvd.status, _pvd.alarmSeverity)
def display_table_parameters(self):
display_wgt = QDialog(self)
display_wgt.setWindowTitle("PV Parameters")
layout = QVBoxLayout()
common_label_width = 120
common_wgt_width = 160
common_hbox_width = common_label_width + common_wgt_width + 20
self.initial_value = 0
self.max_precision_value = 0
for i, pvgate in enumerate(self.pv_gateway):
if pvgate.pv_ctrl is not None:
if pvgate.pv_ctrl.precision > 0:
self.max_precision_value = max(self.max_precision_value,
pvgate.pv_ctrl.precision)
self.initial_value = max(self.initial_value,
pvgate.precision)
if self.max_precision_value > 0:
#precision user
_hbox_wgt = QWidget()
_hbox = QHBoxLayout()
precision_user_label = QLabel("Precision (user):")
self.table_precision_user_wgt = QSpinBox(self)
self.table_precision_user_wgt.setFocusPolicy(Qt.NoFocus)
self.table_precision_user_wgt.setValue(self.initial_value)
self.table_precision_user_wgt.setMaximum(self.max_precision_value)
self.table_precision_user_wgt.valueChanged.connect(
self.table_precision_user_changed)
precision_user_label.setAlignment(Qt.AlignLeft)
self.table_precision_user_wgt.setAlignment(Qt.AlignLeft)
_hbox.addWidget(precision_user_label)
_hbox.addWidget(self.table_precision_user_wgt)
_hbox.setAlignment(Qt.AlignLeft)
_hbox_wgt.setLayout(_hbox)
precision_user_label.setFixedWidth(common_label_width)
self.table_precision_user_wgt.setFixedWidth(40)
_hbox_wgt.setFixedWidth(common_hbox_width)
#precision ioc
_hbox2_wgt = QWidget()
_hbox2 = QHBoxLayout()
precision_ioc_label = QLabel("Precision (ioc): ")
precision_ioc = QPushButton(self)
precision_ioc.setText("Reset")
precision_ioc.clicked.connect(self.table_precision_ioc_reset)
precision_ioc_label.setAlignment(Qt.AlignLeft)
_hbox2.addWidget(precision_ioc_label)
_hbox2.addWidget(precision_ioc)
_hbox2.setAlignment(Qt.AlignLeft)
_hbox2_wgt.setLayout(_hbox2)
precision_ioc_label.setFixedWidth(common_label_width)
precision_ioc.setFixedWidth(50)
_hbox2_wgt.setFixedWidth(common_hbox_width)
layout.addWidget(_hbox_wgt)
layout.addWidget(_hbox2_wgt)
if 'Timestamp' in self.columns_dict.keys():
#time-stamp
_hbox4_wgt = QWidget()
_hbox4 = QHBoxLayout()
ts_label = QLabel("Timestamp: ")
self.ts_combox_idx_dict = {
'second (s)': self.format_ts_sec,
'decisecond (ds)': self.format_ts_deci,
'millisecond (ms)': self.format_ts_milli,
'microsecond (\u03bcs)': self.format_ts_micro,
'nanosecond (ns)': self.format_ts_nano}
ts_resolution = QComboBox(self)
for key, ts_res in self.ts_combox_idx_dict.items():
ts_resolution.addItem(key)
_current_idx = 0
for i, (key, ts_res) in enumerate(self.ts_combox_idx_dict.items()):
if ts_res == self.format_ts_decimal_part:
_current_idx = i
break
ts_resolution.setCurrentIndex(_current_idx)
ts_resolution.currentIndexChanged.connect(
self.table_ts_resolution_changed)
_hbox4.addWidget(ts_label)
_hbox4.addWidget(ts_resolution)
_hbox4_wgt.setLayout(_hbox4)
ts_label.setFixedWidth(common_label_width)
ts_resolution.setFixedWidth(common_wgt_width)
_hbox4_wgt.setFixedWidth(common_hbox_width)
layout.addWidget(_hbox4_wgt)
#precision refresh rate
_hbox3_wgt = QWidget()
_hbox3 = QHBoxLayout()
refresh_freq_label = QLabel("Refresh rate: ")
#_default_refresh_val = 0 if self.notify_freq_hz <= 0 else \
# self.notify_freq_hz
_default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \
self.notify_freq_hz_default
self.refresh_freq_combox_idx_dict = {0: 0, 1: 10, 2: 5, 3: 2, 4: 1,
5: 0.5, 6: _default_refresh_val}
refresh_freq = QComboBox(self)
refresh_freq.addItem('direct')
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[1]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[2]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[3]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[4]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[5]))
_default_text = 'default (direct)' if _default_refresh_val == 0 else \
'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6])
refresh_freq.addItem(_default_text)
for key, value in self.refresh_freq_combox_idx_dict.items():
if value == self.notify_freq_hz:
refresh_freq.setCurrentIndex(key)
break
refresh_freq.currentIndexChanged.connect(
self.table_refresh_rate_changed)
_hbox3.addWidget(refresh_freq_label)
_hbox3.addWidget(refresh_freq)
_hbox3_wgt.setLayout(_hbox3)
refresh_freq_label.setFixedWidth(common_label_width)
refresh_freq.setFixedWidth(common_wgt_width)
_hbox3_wgt.setFixedWidth(common_hbox_width)
layout.addWidget(_hbox3_wgt)
layout.setAlignment(Qt.AlignLeft)
layout.setContentsMargins(10, 0, 0, 0)
layout.setSpacing(0)
display_wgt.setMinimumWidth(340)
display_wgt.setLayout(layout)
display_wgt.exec()
def mousePressEvent(self, event):
row = self.indexAt(event.pos()).row()
if row > -1:
if row < len(self.pv_list):
self.pv_gateway[row].mousePressEvent(event)
else:
button = event.button()
if button == Qt.RightButton:
self.table_context_menu.exec(QCursor.pos())
self.clearFocus()
#remove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
pass
def leaveEvent(self, event):
self.clearSelection()
self.clearFocus()
del event
class QMessageWidget(QListWidget):
"""Log message window."""
def __init__(self, parent=None):
super(QMessageWidget, self).__init__(parent)
self.myItem = None
self.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.setFocusPolicy(Qt.StrongFocus)
def leaveEvent(self, event):
if self.myItem:
self.clearSelection()
self.clearFocus()
del event
def mousePressEvent(self, event):
item = self.itemAt(event.x(), event.y())
if item:
self.myItem = item
self.setCurrentItem(self.myItem)
def keyPressEvent(self, event):
if event.matches(QKeySequence.Copy):
nitem = event.count()
if nitem:
if self.myItem is not None:
_str = self.myItem.text()
QApplication.clipboard().setText(_str)
class QResultsWidget:
"""Results table"""
def __init__(self, summary_dict=None, table_dict=None):
self.summary_dict = summary_dict
self.table_dict = table_dict
self._group_box = None
def group_box(self, title=""):
self._group_box = QGroupBox(title)
self._group_box.setObjectName("OUTERLEFT")
_vbox = QVBoxLayout()
_qspace = QFrame()
_qspace.setFixedHeight(10)
_vbox.addWidget(_qspace)
_font = QFont("Sans Serif", 10)
longest_str_item1 = ""
longest_str_item2 = ""
for i, (label, text) in enumerate(self.summary_dict.items()):
if len(str(label)) > len(longest_str_item1):
longest_str_item1 = str(label)
if len(str(text)) > len(longest_str_item2):
longest_str_item2 = str(text)
fm = QFontMetricsF(_font)
_factor = 1.15
if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"):
_factor = 1.18
qrect1 = fm.boundingRect(longest_str_item1)
qrect2 = fm.boundingRect(longest_str_item2)
_width_scaling_factor = 1.5
_width_scaling_factor_le = 1.15
_widget_height = 25
for i, (label, text) in enumerate(self.summary_dict.items()):
#print(label, text)
qlabel = QLabel(label)
qle = QLineEdit(text)
qlabel.setFont(_font)
qlabel.setStyleSheet(("QLabel{color:black;" +
"margin:0px; padding:2px;}"))
qlabel.setFixedWidth(qrect1.width() * _width_scaling_factor)
qlabel.setFixedHeight(_widget_height)
qle.setFocusPolicy(Qt.NoFocus)
qle.setFont(_font)
qle.setStyleSheet(("QLineEdit{color:blue;" +
"background-color: lightgray;" +
"qproperty-readOnly: true;" +
"margin:0px; padding:2px;}"))
qle.setFixedWidth(qrect2.width() * _width_scaling_factor_le)
qle.setFixedHeight(_widget_height)
qle.setAlignment(Qt.AlignRight)
_hbox_widget = QWidget()
_hbox = QHBoxLayout()
_hbox.addWidget(qlabel)
_hbox.addWidget(qle)
_hbox_widget.setLayout(_hbox)
_hbox.setAlignment(Qt.AlignCenter)
_hbox.setContentsMargins(0, 2, 0, 0)
_vbox.addWidget(_hbox_widget)
_vbox.setContentsMargins(0, 0, 0, 0)
_vbox.setAlignment(Qt.AlignCenter|Qt.AlignTop)
_vbox2_widget = QWidget()
_vbox2 = QVBoxLayout()
_vbox2.setContentsMargins(0, 20, 0, 40)
table = QTableWidget(len(self.table_dict)-1, 2)
table.verticalHeader().setVisible(False)
table.setFocusPolicy(Qt.NoFocus)
#table.setFont(_font)
longest_str_item1 = ""
longest_str_item2 = ""
for i, (label, text) in enumerate(self.table_dict.items()):
item1 = QTableWidgetItem(str(label))
item2 = QTableWidgetItem(str(text))
item1.setTextAlignment(Qt.AlignCenter)
item2.setTextAlignment(Qt.AlignCenter)
item1.setForeground(QColor("black"))
item2.setForeground(QColor("black"))
if i%2 == 0:
item1.setBackground(QColor("lightgray"))
item2.setBackground(QColor("lightgray"))
if len(str(label)) > len(longest_str_item1):
longest_str_item1 = str(label)
if len(str(text)) > len(longest_str_item2):
longest_str_item2 = str(text)
if i == 0:
#item1.setFont(_font)
#item2.setFont(_font)
table.setHorizontalHeaderItem(0, item1)
table.setHorizontalHeaderItem(1, item2)
else:
table.setItem(i-1, 0, item1)
table.setItem(i-1, 1, item2)
fm = QFontMetricsF(_font)
_factor = 1.2
if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"):
_factor = 1.18
qrect = fm.boundingRect(longest_str_item1 + longest_str_item2)
_width_scaling_factor = 1.04
table.resizeColumnsToContents()
table.resizeRowsToContents()
table.setFixedHeight((fm.lineSpacing() * _factor * len(
self.table_dict)) + fm.lineSpacing()*2)
table.setFixedWidth(((qrect.width()) * _width_scaling_factor))
_vbox2.addWidget(table)
_vbox2.setAlignment(Qt.AlignCenter|Qt.AlignTop)
_vbox2_widget.setLayout(_vbox2)
_vbox.addWidget(_vbox2_widget)
self._group_box.setLayout(_vbox)
self._group_box.setContentsMargins(20, 20, 20, 20)
self._group_box.setAlignment(Qt.AlignTop)
self._group_box.setFixedHeight(
table.height() + (_widget_height*len(self.summary_dict)))
self._group_box.setFixedWidth(table.width() + 20)
return self._group_box
class QResultsTableWidget():
"""Results table"""
def __init__(self, column_headings=None):
self.column_headings = column_headings
self._group_box = None
def group_box(self, title="Table of Results"):
self._group_box = QGroupBox(title)
self._group_box.setObjectName("OUTER")
_font = QFont("Sans Serif", 10)
_vbox2_widget = QWidget()
_vbox2 = QVBoxLayout()
_vbox2.setContentsMargins(0, 20, 0, 40)
table = QTableWidget(1, len(self.column_headings))
table.verticalHeader().setVisible(True)
table.setFocusPolicy(Qt.NoFocus)
table.setFont(_font)
for i, heading in enumerate(self.column_headings):
_item = QTableWidgetItem(str(heading))
table.setHorizontalHeaderItem(i, _item)
table.resizeColumnsToContents()
table.resizeRowsToContents()
table.setFixedHeight(400)
_vbox2.addWidget(table)
_vbox2.setAlignment(Qt.AlignCenter|Qt.AlignTop)
_vbox2_widget.setLayout(_vbox2)
self._group_box.setLayout(_vbox2)
self._group_box.setContentsMargins(20, 20, 20, 20)
self._group_box.setAlignment(Qt.AlignTop)
self._group_box.setFixedWidth(table.width() + 20)
return self._group_box
class QHDFDockWidget(QDockWidget):
def __init__(self, title=None, parent=None):
super().__init__(title, parent)
self.parent = parent
self.is_docked = True
self.geometry_from_qsettings = self.parent.application_geometry
self.topLevelChanged.connect(self._top_level_changed)
self.setVisible(False)
self.setFloating(False)
self.geometry_from_qsettings = self.parent.geometry()
def closeEvent(self, event: QCloseEvent):
super().closeEvent(event)
self.parent.setGeometry(self.geometry_from_qsettings)
self.setGeometry(self.geometry_from_qsettings)
QApplication.processEvents()
self.parent.setGeometry(self.geometry_from_qsettings)
def changeEvent(self, event):
pass
def _top_level_changed(self, is_floating):
pass
class QNoDockWidget(QDockWidget):
def __init__(self, title=None, parent=None):
super().__init__(title, parent)
self.parent = parent
self.is_docked = True
self.geometry_from_qsettings = self.parent.application_geometry
self.topLevelChanged.connect(self._top_level_changed)
self.setVisible(False)
self.setFloating(True)
x = self.geometry_from_qsettings.x() + 480 # 3500 #screen.width() - widget.width()
y = self.geometry_from_qsettings.y() + 350 #100 #screen.height() - widget.height()
self.move(x, y)
def changeEvent(self, event):
if "QAbstractButton" in str(self.sender()):
self.geometry_from_qsettings = self.parent.geometry()
def _top_level_changed(self): #, is_floating):
self.setVisible(False)
self.setFloating(True)
#ResetGeometry
self.parent.setGeometry(self.geometry_from_qsettings)
QApplication.processEvents()
class CAQStripChart(PlotWidget):
'''Channel access enabled pyqtgraph.PlotWidget'''
def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'],
monitor_callback=None, pv_within_daq_group: bool = False,
color_mode=None, show_units: bool = False, prefix: str = "",
suffix: str = "", notify_freq_hz: int = 0, title: str = "",
ylabel: str = "", force_ts_align = True, text_label = [],
pen_color_idx = 0):
super().__init__()
self.no_channels = len(pv_list)
self.pen_color_idx = pen_color_idx
self.text_label = text_label
self.found = False
self.time_zero = [0] * self.no_channels
self.time_delta = [0] * self.no_channels
self.pv_list = pv_list
self.pv2item_dict = {}
self.pv_gateway = [None] * self.no_channels
self.pvd_previous_list = [None] * self.no_channels
self.val_previous = [None] * self.no_channels
self.curve = [None] * self.no_channels
for i in range (0, len(self.pv_list)):
self.pv_gateway[i] = PVGateway(
parent, self.pv_list[i], monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
#connect_callback=self.py_connect_callback,
connect_triggers=False, notify_freq_hz=notify_freq_hz,
monitor_dbr_time = True)
self.pv_gateway[i].is_initialize_complete()
self.pvd_previous_list[i] = self.pv_gateway[i].pvd
self.pv_gateway[i].trigger_connect.connect(
self.receive_connect_update)
self.pv_gateway[i].trigger_monitor_str.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_int.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_float.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor.connect(
self.receive_monitor_dbr_time)
self.pv_gateway[i].widget_class = "PlotWidget"
self.pv2item_dict[self.pv_gateway[i]] = i
self.cafe = self.pv_gateway[0].cafe
self.cyca = self.pv_gateway[0].cyca
for i in range(0, len(self.pv_gateway)):
if self.cafe.isConnected(self.pv_gateway[i].pv_name):
self.pv_gateway[i].trigger_connect.emit(
self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name),
self.pv_gateway[i].cyca.ICAFE_CS_CONN)
for i in range(0, len(self.pv_gateway)):
if not self.pv_gateway[i].pv_within_daq_group:
self.pv_gateway[i].monitor_start()
sampleinterval = 0.2
timewindow = 1800.0
self.ts_delta_max = 0.6
# Data stuff
self._interval = int(sampleinterval*1000)
self._bufsize = 10000 #int(timewindow/0.33)
self._bufsize2 = 10000 # int(timewindow/1.33)
self.databuffer = [None] * self.no_channels
self.timebuffer = [None] * self.no_channels
self.x = [None] * self.no_channels
self.y = [None] * self.no_channels
self.x_shifted = [None] * self.no_channels
self.idx = [0] * self.no_channels
for i in range(0, self.no_channels):
bsize = self._bufsize if i == 0 else self._bufsize2
self.databuffer[i] = collections.deque([None]*bsize, bsize)
self.timebuffer[i] = collections.deque([0]*bsize, bsize)
self.x[i] = np.zeros(bsize, dtype=np.float)
self.y[i] = np.zeros(bsize, dtype=np.float)
_long_size=20
#self.data_series_buffer = collections.deque([0]*_long_size, _long_size)
#self.time_series_buffer = collections.deque([0]*_long_size, _long_size)
#self.data_series = [] * self.no_channels
#self.time_series = [] * self.no_channels
self.iflag_series = 0
#self.x = np.linspace(-timewindow, 0.0, self._bufsize)
#self.x_series = np.zeros(_long_size, dtype=np.float)
#self.y_series = np.zeros(_long_size, dtype=np.float)
if title is not None:
self.setTitle(str(title)) #self.pv_gateway[0].pv_name)
self.showGrid(x=True, y=True)
#self.setLabel('left', ylabel, self.pv_gateway[0].units)
self.setLabel('bottom', 'time', 's')
self.setBackground((60, 60, 60)) #247, 236, 249))
#self.setLimits(yMin=-0.11)
#self.setLimits(yMin=-1, yMax=1)
ax = pg.AxisItem('left')
ax.enableAutoSIPrefix(enable=False)
ax.setLabel(ylabel, self.pv_gateway[0].units)
ax.setGrid(155)
ay = pg.AxisItem('bottom')
ay.enableAutoSIPrefix(enable=False)
ay.setLabel('time', 'min')
ay.setGrid(175)
if 'BPM' in text_label:
ax.setTickSpacing(0.2, 0.1)
#ax.setRange(-1, 1)
#ax.setParentItem(self.graphicsItem())
axitems = {}
axitems['left'] = ax
axitems['bottom'] = ay
self.setAxisItems(axitems)
pg.setConfigOption('leftButtonPan', False)
self.plotItem.setMouseEnabled(y=True) # Only allow zoom in X-axis
self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis
#(125, 249, 255)
if self.pen_color_idx == 0:
pen_list = [ (255, 155, 0), (255,255,0), (0, 180, 255) ]
elif self.pen_color_idx == 1:
pen_list = [ (125, 249, 255), (255,255,0), (0, 180, 255) ]
else:
pen_list = [ (0, 180, 255), (125, 249, 255), (255,255,0) ]
for i in range(0, len(self.pv_gateway)):
self.curve[i] = self.plot(self.x[0], self.y[0], pen=pen_list[i]) # (0, 253, 235))
#self.curve[1] = self.plot(self.x[1], self.y[1], pen=(255,255,0))
#offset=(1.0, 1.0),
l=pg.LegendItem() #horSpacing=20, verSpacing=0, labelTextColor=(205, 205, 205),
#labelTextSize='6px', colCount=1)
l.setParentItem(self.graphicsItem())
l.anchor((0,0), (0.08, 0.0))
#l.setLabelTextColor((205, 205, 205))
#l.setLabelTextSize(9) does not exists(!)
#l.setOffset(-60)
for curv, label in zip(self.curve, self.text_label):
l.addItem(curv, label)
#for curv, pv in zip(self.curve, self.pv_gateway):
# l.addItem(curv, self.textpv.pv_name)
#self.daq_stop()
#print(self._bufsize)
#print(len(self.x), len(self.y))
QApplication.processEvents()
@Slot(object, int)
def receive_monitor_dbr_time(self, pvdata, alarm_severity):
#if not self.mutex.tryLock(): #locked():
# print("Event locked", pvdata.ts[0], pvdata.ts[1])
# return
#self.mutex.lock() #acquire()
_row = self.pv2item_dict[self.sender()]
#print("row, value from pvdata==>", _row, pvdata.value[0], self.pv_gateway[_row].pv_name)
ts_now = pvdata.ts[0] + pvdata.ts[1] * 10**(-9)
ts_previous = (self.pvd_previous_list[_row].ts[0] +
self.pvd_previous_list[_row].ts[1] * 10**(-9))
ts_delta = ts_now - ts_previous
if (pvdata.ts[0] == self.pvd_previous_list[_row].ts[0]) and (
pvdata.ts[1] == self.pvd_previous_list[_row].ts[1]):
#print("SAME TIMESTAMP")
#pvdata.show()
#self.pvd_previous_list[_row].show()
#print("================")
#self.mutex.unlock() #release()
return
if pvdata.ts[0] < self.pvd_previous_list[_row].ts[0]:
print("Funny ts value", self.pv_gateway[_row].pv_name)
pvdata.show()
#self.mutex.unlock() #release()
return
value = pvdata.value[0]
#discard first callbacks
#if ts_delta > 2.0:
# self.pvd_previous_list[_row] = _pvd
# return;
self.pvd_previous_list[_row] = pvdata
self.val_previous[_row] = value
#self.pvd_previous_list[_row].ts[0] = _pvd.ts[0]
#self.pvd_previous_list[_row].ts[1] = _pvd.ts[1]
self.databuffer[_row].append(value)
self.timebuffer[_row].append(self.time_delta[_row])
highest_ts = self.timebuffer[0][0] \
if self.timebuffer[0][0] is not None else 0
for i in range(1, len(self.timebuffer)):
if self.timebuffer[i][0] is None:
continue
elif self.timebuffer[i][0] > highest_ts:
highest_ts = self.timebuffer[i][0]
if self.timebuffer[_row][0] is not None:
for i, val in enumerate(self.timebuffer[_row]):
if val > highest_ts:
self.idx[_row] = i - 1
break
self.y[_row][:] = self.databuffer[_row]
self.x[_row][:] = self.timebuffer[_row]
idx = self.idx[_row]
self.x_shifted[_row] = list(map(lambda m : (m - self.time_delta[_row]), self.x[_row][idx:]))
#print("idx", self.idx)
#if 'AVG' in self.pv_gateway[_row].pv_name:
# for row in range(0, len(self.curve)):
# self.curve[row].setData(self.x_shifted[row], self.y[row][idx:])
self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:])
self.time_delta[_row] = ((
pvdata.ts[0] + pvdata.ts[1]*10**(-9)) - self.time_zero[0])/60
#self.mutex.unlock() #release()
#QApplication.processEvents()
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
#self.pv_gateway.receive_monitor_update(value, status, alarm_severity)
_row = self.pv2item_dict[self.sender()]
#if _row == 1:
# return
print("row, value===>", _row, value, self.pv_gateway[_row].pv_name)
_pvd = self.pv_gateway[_row].cafe.getPVCache(
self.pv_gateway[_row].handle)
#print("value", _pvd.value[0], self.pvd_previous_list[_row].value[0])
_pvd2 = self.pv_gateway[_row].pvd
print ("val", value, _pvd2.value[0], _pvd.value[0], self.pvd_previous_list[_row].value[0])
ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9)
ts_previous = (self.pvd_previous_list[_row].ts[0] +
self.pvd_previous_list[_row].ts[1] * 10**(-9))
ts_delta = ts_now - ts_previous
if value == self.val_previous[_row]:
#if (_pvd.ts[0] == self.pvd_previous_list[_row].ts[0]) and (
# _pvd.ts[1] == self.pvd_previous_list[_row].ts[1]):
_pvd.show()
#self.pvd_previous_list[_row].show()
return
#discard first callbacks
#if ts_delta > 2.0:
# self.pvd_previous_list[_row] = _pvd
# return;
self.pvd_previous_list[_row] = _pvd2
self.val_previous[_row] = value
#self.pvd_previous_list[_row].ts[0] = _pvd.ts[0]
#self.pvd_previous_list[_row].ts[1] = _pvd.ts[1]
self.databuffer[_row].append(value)
self.timebuffer[_row].append(self.time_delta[_row])
highest_ts = self.timebuffer[0][0] \
if self.timebuffer[0][0] is not None else 0
for i in range(1, len(self.timebuffer)):
if self.timebuffer[i][0] is None:
continue
elif self.timebuffer[i][0] > highest_ts:
highest_ts = self.timebuffer[i][0]
if self.timebuffer[_row][0] is not None:
for i, val in enumerate(self.timebuffer[_row]):
if val > highest_ts:
self.idx[_row] = i - 1
break
'''
for i in range(1, self.timebuffer):
if self.timebuffer[i][0] is not None:
a = self.timebuffer[0][0]
for i, val in enumerate(self.timebuffer[_row]):
if val > a:
idx = i - 1
break
'''
self.y[_row][:] = self.databuffer[_row]
self.x[_row][:] = self.timebuffer[_row]
#self.y[_row][:] = self.databuffer[_row]
#self.x[_row][:] = self.timebuffer[_row]
'''
#print(ts_delta, value, self.pvd_previous.value[0])
#if (ts_delta < self.ts_delta_max) and (value < self.pvd_previous.value[0]) :
if (value < self.pvd_previous.value[0]) :
self.data_series_buffer.append(value)
self.time_series_buffer.append(ts_now - self.time_zero ) #self.time_delta)
self.y_series[:] = self.data_series_buffer
self.x_series[:] = self.time_series_buffer
#print(self.x_series, self.y_series)
#elif ts_delta < 1.0:
if len(self.data_series_buffer) > 15:
#x_series = np.array(self.time_series, dtype=np.float)
#y_series = np.array(self.data_series, dtype=np.float)
_x=self.x_series.reshape((-1, 1))
model = LinearRegression()
model.fit(_x, self.y_series)
r_sq = model.score(_x, self.y_series)
###JCprint('coefficient of determination:', r_sq, "slope", model.coef_ , "lifetime:", self.y_series[0]/model.coef_ / 3600)
#print('intercept:', model.intercept_)
#print('slope:', model.coef_)
#print('max value', y_series[0], y_series[1])
if r_sq > 0.995:
_I = self.y_series[0]
###JCprint("lifetime:", _I/model.coef_ / 3600)
y_pred = model.predict(_x)
#print("len, y_pred, _x", len(y_pred), len(self.y_series), len(_x))
#print('predicted response:', y_pred, sep='\n')
m_sq_error = mean_squared_error(self.y_series, y_pred)
#print('Mean squared error: {0:.9f}'.format(
# mean_squared_error(y_series, y_pred)))
#print('Coefficient of determination: {0:.9f}'.format(
# r2_score(y_series, y_pred)))
self.trigger_series_sequence.emit(self.x_series, self.y_series)
#print("emit")
self.data_series = []
self.time_series = []
#print(len(self.x_series), len(self.y_series))
else:
self.data_series = []
self.time_series = []
'''
#dt = (self.x[-1] - self.x[-2])
#print("dt", dt)
#Lowet IPCT before trigger is set to t=0
idx = self.idx[_row]
self.x_shifted[_row] = list(map(lambda m : (m - self.time_delta[_row]), self.x[_row][idx:]))
##self.y = np.where(self.y != self.y, 0, self.y) #test for nan
#print("row len len ", _row, self.time_delta[0], self.time_delta[1])
self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:])
self.time_delta[_row] = (
_pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero[0]
'''
LOOK_BACK = -800
if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name:
LOOK_BACK = -250
if value > self.y[-2]:
if not self.found:
#print(x_shifted[-240:], self.y[-240:])
#self.y = np.where(self.y != self.y, 0, self.y) #test for nan
max_index = self.y[LOOK_BACK:].argmax()
if max_index == 0:
return
print("max index=", max_index)
#print(x_shifted[-600+max_index:], self.x[-600+max_index:])
#print(self.y[-600+max_index:-2])
self.found = True
#print("Are Signals blocked??", self.signalsBlocked())
self.trigger_decay_sequence.emit(np.array(
x_shifted[LOOK_BACK+max_index+9:-2]), self.y[LOOK_BACK+max_index+9:-2])
else:
self.found = False
'''
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
print("pv_name==>", pv_name)
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].receive_connect_update(handle, pv_name, status,
post_display=False)
#self.pv_gateway.receive_connect_update(handle, pv_name, status)
_pvd = self.pv_gateway[_row].cafe.getPVCache(self.pv_gateway[_row].handle)
if self.time_zero[_row] == 0:
self.time_zero[_row] = _pvd.ts[0] + _pvd.ts[1]*10**(-9)
self.pvd_previous = _pvd
#renove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
#event.ignore()
pass
def leaveEvent(self, event):
self.clearFocus()
del event
class CAQPCTChart(PlotWidget):
'''Channel access enabled pyqtgraph.PlotWidget'''
#trigger_monitor_float = Signal(float, int, int)
#trigger_monitor_int = Signal(int, int, int)
#trigger_monitor_str = Signal(str, int, int)
#trigger_connect = Signal(int, str, int)
trigger_decay_sequence = Signal(np.ndarray, np.ndarray)
trigger_series_sequence = Signal(np.ndarray, np.ndarray)
#def py_connect_callback(self, handle, pvname, status):
# self.trigger_connect.emit(int(handle), str(pvname), int(status))
# print("py connect callback", handle, pvname, status)
def daq_start(self):
self.blockSignals(False)
def daq_pause(self):
self.blockSignals(True)
def daq_stop(self):
self.blockSignals(True)
def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'],
monitor_callback=None, pv_within_daq_group: bool = False,
color_mode=None, show_units: bool = False, prefix: str = "",
suffix: str = "", notify_freq_hz: int = 0):
super().__init__()
self.found = False
self.time_zero = 0
self.time_delta = 0
self.pv_list = pv_list
self.pv2item_dict = {}
self.pv_gateway = [None] * len(self.pv_list)
self.pvd_previous = None
for i in range(0, len(self.pv_list)):
self.pv_gateway[i] = PVGateway(
parent, pv_list[i], monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
#connect_callback=self.py_connect_callback,
connect_triggers=False, notify_freq_hz=notify_freq_hz)
self.pv_gateway[i].is_initialize_complete()
self.pv_gateway[i].trigger_connect.connect(
self.receive_connect_update)
self.pv_gateway[i].trigger_monitor_str.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_int.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_float.connect(
self.receive_monitor_update)
self.pv_gateway[i].widget_class = "PlotWidget"
self.pv2item_dict[self.pv_gateway[i]] = i
self.cafe = self.pv_gateway[0].cafe
self.cyca = self.pv_gateway[0].cyca
for i in range(0, len(self.pv_gateway)):
if self.cafe.isConnected(self.pv_gateway[i].pv_name):
self.pv_gateway[i].trigger_connect.emit(
self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name),
self.pv_gateway[i].cyca.ICAFE_CS_CONN)
for i in range(0, len(self.pv_gateway)):
if not self.pv_gateway[i].pv_within_daq_group:
self.pv_gateway[i].monitor_start()
sampleinterval = 0.333
timewindow = 1800.0
self.ts_delta_max = 0.6
# Data stuff
self._interval = int(sampleinterval*1000)
self._bufsize = int(timewindow/sampleinterval)
self.databuffer = collections.deque([None]*self._bufsize, self._bufsize)
self.timebuffer = collections.deque([0]*self._bufsize, self._bufsize)
_long_size = 20
self.data_series_buffer = collections.deque([0]*_long_size, _long_size)
self.time_series_buffer = collections.deque([0]*_long_size, _long_size)
self.data_series = []
self.time_series = []
self.iflag_series = 0
#self.x = np.linspace(-timewindow, 0.0, self._bufsize)
self.x = np.zeros(self._bufsize, dtype=np.float)
self.y = np.zeros(self._bufsize, dtype=np.float)
self.x_series = np.zeros(_long_size, dtype=np.float)
self.y_series = np.zeros(_long_size, dtype=np.float)
self.setTitle("PCT(t)") #self.pv_gateway[0].pv_name)
self.showGrid(x=True, y=True)
self.setLabel('left', 'I', 'mA')
self.setLabel('bottom', 'time', 's')
self.setBackground((60, 60, 60)) #247, 236, 249))
self.setLimits(yMin=-0.11)
self.plotItem.setMouseEnabled(y=False) # Only allow zoom in X-axis
self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis
self.curve = self.plot(self.x, self.y, pen=(125, 249, 255))
#self.curve2 = self.plot(self.x, self.y, pen=(255,255,0))
l = pg.LegendItem(offset=(0., 0.5))
l.setParentItem(self.graphicsItem())
l.setLabelTextColor((125, 249, 255))
l.addItem(self.curve, str(self.pv_gateway[0].pv_name))
'''
l2=self.addLegend()
l2.setLabelTextColor('g')
l2.setOffset(10)
l2.addItem(self.curve2, str(self.pv_gateway[0].pv_name))
'''
self.daq_stop()
print(self._bufsize)
print(len(self.x), len(self.y))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
#self.pv_gateway.receive_monitor_update(value, status, alarm_severity)
_row = self.pv2item_dict[self.sender()]
#print("value===>", value, self.pv_gateway[_row].pv_name)
_pvd = self.pv_gateway[_row].cafe.getPVCache(
self.pv_gateway[_row].handle)
ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9)
ts_previous = self.pvd_previous.ts[0] + self.pvd_previous.ts[1]*10**(-9)
ts_delta = ts_now - ts_previous
if (_pvd.ts[0] == self.pvd_previous.ts[0]) and (
_pvd.ts[1] == self.pvd_previous.ts[1]):
#_pvd.show()
return
#discard first callbacks
if ts_delta > 2.0:
self.pvd_previous = _pvd
return
self.databuffer.append(value)
self.y[:] = self.databuffer
self.timebuffer.append(self.time_delta)
self.x[:] = self.timebuffer
#print(ts_delta, value, self.pvd_previous.value[0])
#if (ts_delta < self.ts_delta_max) and (value <
# self.pvd_previous.value[0]):
if value < self.pvd_previous.value[0]:
self.data_series_buffer.append(value)
self.time_series_buffer.append(ts_now - self.time_zero)
self.y_series[:] = self.data_series_buffer
self.x_series[:] = self.time_series_buffer
#print(self.x_series, self.y_series)
#elif ts_delta < 1.0:
if len(self.data_series_buffer) > 15:
#x_series = np.array(self.time_series, dtype=np.float)
#y_series = np.array(self.data_series, dtype=np.float)
_x = self.x_series.reshape((-1, 1))
model = LinearRegression()
model.fit(_x, self.y_series)
r_sq = model.score(_x, self.y_series)
###JCprint('coefficient of determination:',
###r_sq, "slope", model.coef_ , "lifetime:",
###self.y_series[0]/model.coef_ / 3600)
#print('intercept:', model.intercept_)
#print('slope:', model.coef_)
#print('max value', y_series[0], y_series[1])
if r_sq > 0.995:
#_I = self.y_series[0]
###JCprint("lifetime:", _I/model.coef_ / 3600)
####y_pred = model.predict(_x)
#print("len, y_pred, _x", len(y_pred), len(self.y_series),
# len(_x))
#print('predicted response:', y_pred, sep='\n')
##m_sq_error = mean_squared_error(self.y_series, y_pred)
#print('Mean squared error: {0:.9f}'.format(
# mean_squared_error(y_series, y_pred)))
#print('Coefficient of determination: {0:.9f}'.format(
# r2_score(y_series, y_pred)))
self.trigger_series_sequence.emit(self.x_series,
self.y_series)
#print("emit")
self.data_series = []
self.time_series = []
#print(len(self.x_series), len(self.y_series))
else:
self.data_series = []
self.time_series = []
self.pvd_previous = _pvd
#dt = (self.x[-1] - self.x[-2])
#print("dt", dt)
#Lowet IPCT before trigger is set to t=0
x_shifted = list(map(lambda m: (m - self.time_delta), self.x))
##self.y = np.where(self.y != self.y, 0, self.y) #test for nan
self.curve.setData(x_shifted, self.y)
self.time_delta = (
_pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero
#x_shifted2= list(map(lambda m : m -self.time_delta-1 , self.x))
#self.curve2.setData(x_shifted2, self.y)
#QApplication.processEvents()
#print(type(x_shifted), type(self.y), type([1.1]), type(1.1))
LOOK_BACK = -800
if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name:
LOOK_BACK = -250
if value > self.y[-2]:
if not self.found:
#print(x_shifted[-240:], self.y[-240:])
#self.y = np.where(self.y != self.y, 0, self.y) #test for nan
max_index = self.y[LOOK_BACK:].argmax()
if max_index == 0:
return
print("max index=", max_index)
#print(x_shifted[-600+max_index:], self.x[-600+max_index:])
#print(self.y[-600+max_index:-2])
self.found = True
#print("Are Signals blocked??", self.signalsBlocked())
self.trigger_decay_sequence.emit(
np.array(x_shifted[LOOK_BACK+max_index+9:-2]),
self.y[LOOK_BACK+max_index+9:-2])
else:
self.found = False
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
print("pv_name==>", pv_name)
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].receive_connect_update(handle, pv_name, status,
post_display=False)
#self.pv_gateway.receive_connect_update(handle, pv_name, status)
_pvd = self.pv_gateway[_row].cafe.getPVCache(
self.pv_gateway[_row].handle)
if self.time_zero == 0:
self.time_zero = _pvd.ts[0] + _pvd.ts[1]*10**(-9)
#print(self.time_zero)
self.pvd_previous = _pvd
#renove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
pass
def leaveEvent(self, event):
self.clearFocus()
del event