Files
caqtwidgets/pvwidgets.py

3538 lines
132 KiB
Python

''' Module with channel access enabled QtWidgets.'''
__author__ = 'Jan T. M. Chrin'
import collections
import numpy as np
import re
from threading import Lock
import time
from sklearn.linear_model import LinearRegression
from distutils.version import LooseVersion
from functools import reduce as func_reduce
from qtpy.QtCore import (
QEventLoop, QMutex, QPoint, Qt, QThread, QTimer, Signal, Slot)
from qtpy.QtGui import (
QCloseEvent, QColor, QCursor, QFont, QFontMetricsF, QIcon, QKeySequence)
from qtpy.QtCore import __version__ as QT_VERSION_STR
from qtpy.QtWidgets import (
QAbstractItemView, QAbstractSpinBox, QAction, QApplication, QBoxLayout,
QCheckBox, QComboBox, QDialog, QDockWidget, QDoubleSpinBox, QFrame,
QGroupBox, QHBoxLayout, QLabel, QLineEdit, QListWidget, QMenu, QMessageBox,
QPushButton, QSpinBox, QStyle, QStyleOptionSpinBox, QTableWidget,
QTableWidgetItem, QVBoxLayout, QWidget)
import pyqtgraph as pg
from pyqtgraph import PlotWidget
from caqtwidgets.pvgateway import PVGateway
class QTaggedLineEdit(QWidget):
def __init__(self, label_text=str(""), value="",
position="LEFT", parent=None):
super(QTaggedLineEdit, self).__init__(parent)
self.parameter = str(value)
self.label = QLabel(label_text)
self.label.setObjectName("Tagged")
self.label.setFixedHeight(24)
self.label.setContentsMargins(10, 0, 0, 0)
#self.label.setFixedWidth(80)
self.line_edit = QLineEdit(self.parameter)
self.line_edit.setObjectName("Write")
self.line_edit.setFixedHeight(24)
font = QFont("sans serif", 16)
fm = QFontMetricsF(font)
self.line_edit.setMaximumWidth(int(fm.width(self.parameter)+20))
self.label.setBuddy(self.line_edit)
layout = QBoxLayout(
QBoxLayout.LeftToRight if position == "LEFT" else \
QBoxLayout.TopToBottom)
layout.addWidget(self.label)
layout.addWidget(self.line_edit)
layout.addStretch()
layout.setSpacing(2)
layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(layout)
class QHLine(QFrame):
def __init__(self):
super(QHLine, self).__init__()
self.setFrameShape(QFrame.HLine)
self.setFrameShadow(QFrame.Sunken)
class QVLine(QFrame):
def __init__(self):
super(QVLine, self).__init__()
self.setFrameShape(QFrame.VLine)
self.setFrameShadow(QFrame.Sunken)
class AppQLineEdit(QLineEdit):
def __init__(self, parent=None):
#super().__init__(parent)
pass
def leaveEvent(self, event):
self.clearFocus()
del event
class CAQLineEdit(QLineEdit, PVGateway):
'''Channel access enabled QLineEdit widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
trigger_daq = Signal(object, str, int)
trigger_daq_int = Signal(object, str, int)
trigger_daq_str = Signal(object, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = "",
notify_freq_hz: int = 0, precision: int = 0):
super().__init__(parent, pv_name, monitor_callback,
pv_within_daq_group, color_mode, show_units, prefix,
suffix, connect_callback=self.py_connect_callback,
notify_freq_hz=notify_freq_hz, precision=precision)
self.is_initialize_complete()
self.configure_widget()
if not self.pv_within_daq_group:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(object, str, int)
def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.NoFocus)
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight(int(fm.lineSpacing()*1.8))
self.setFixedWidth(int(qrect.width() * _width_scaling_factor))
if self.pv_within_daq_group:
self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA)
else:
self.qt_property_initial_values(qt_object_name=self.PV_READBACK)
#renove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
#event.ignore()
pass
def leaveEvent(self, event):
self.clearFocus()
del event
class CAQLabel(QLabel, PVGateway):
'''Channel access enabled QLabel widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
trigger_daq = Signal(object, str, int)
trigger_daq_int = Signal(object, str, int)
trigger_daq_str = Signal(object, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = "",
notify_freq_hz: int = 0, precision: int = 0,
scale_factor: float = 1):
super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
connect_callback=self.py_connect_callback,
notify_freq_hz=notify_freq_hz, precision=precision)
self.scale_factor = scale_factor
self.is_initialize_complete()
self.configure_widget()
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(object, str, int)
def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
if self.scale_factor != 1:
daq_pvd.value[0] = daq_pvd.value[0] * self.scale_factor
PVGateway.receive_daq_update(self, daq_pvd, daq_mode, daq_state)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
if self.scale_factor != 1:
value = value * self.scale_factor
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.NoFocus)
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight(int(fm.lineSpacing()*1.8))
self.setFixedWidth(int(qrect.width() * _width_scaling_factor))
if self.pv_within_daq_group:
self.qt_property_initial_values(qt_object_name=self.PV_DAQ_CA)
else:
self.qt_property_initial_values(qt_object_name=self.PV_READBACK)
#For use with CAQMenu
class QLineEditExtended(QLineEdit):
def __init__(self, parent=None):
super().__init__(parent)
self.parent = parent
def mousePressEvent(self, event):
button = event.button()
if button == Qt.RightButton:
self.parent.showContextMenu()
elif button == Qt.LeftButton:
self.parent.mousePressEvent(event)
class CAQMenu(QComboBox, PVGateway):
'''Channel access enabled QMenu widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, prefix: str = "", suffix: str = ""):
super().__init__(parent, pv_name, monitor_callback,
pv_within_daq_group, color_mode, show_units, prefix,
suffix, connect_callback=self.py_connect_callback)
self.is_initialize_complete()
self.configure_widget()
#After configure:widget
self.currentIndexChanged.connect(self.value_change)
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
def configure_widget(self):
self.previousIndex = None
self.setFocusPolicy(Qt.NoFocus)
self.setEditable(True)
self.setLineEdit(QLineEditExtended(self))
self.lineEdit().setReadOnly(True)
self.lineEdit().setAlignment(Qt.AlignCenter)
enumStringList = self.cafe.getEnumStrings(self.handle)
self.addItems(enumStringList)
for i in range(0, self.count()):
self.setItemData(i, Qt.AlignCenter, Qt.TextAlignmentRole)
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.1
self.setFixedHeight(round(fm.lineSpacing()*1.8))
self.setFixedWidth(round((qrect.width()+40) * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
def post_display_value(self, value):
'''Convert value to index'''
if "setCurrentIndex" in dir(self):
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if isinstance(value, str):
self.setCurrentIndex(self.cafe.getEnumFromString(self.handle,
value))
elif isinstance(value, int):
self.setCurrentIndex(value)
#Should not happen
elif isinstance(value, float):
self.setCurrentIndex(int(value))
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
#self.previousIndex = self.currentIndex()
return
else:
print(("ERROR: overloaded post_display_value: 'setCurrentIndex' "
"method does not exist!"))
def value_change(self, indx):
status = self.cafe.set(self.handle, indx)
if status != self.cyca.ICAFE_NORMAL:
#self.showSetErrorMsg(status)
value = self.cafe.getCache(self.handle, 'int')
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if value is not None:
self.setCurrentIndex(value)
else:
if self.previousIndex is not None:
self.setCurrentIndex(self.previousIndex)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
self.pv_message_in_a_box.setText(
"CAQMenu set operation reports error:\n{0}".format(
self.cafe.getStatusCodeAsString(status)))
self.pv_message_in_a_box.exec()
def mousePressEvent(self, event):
button = event.button()
if button == Qt.RightButton:
PVGateway.mousePressEvent(self, event)
elif self.pv_info is not None:
if self.pv_info.accessWrite == 0:
event.ignore()
return
QComboBox.mousePressEvent(self, event)
self.previousIndex = self.currentIndex()
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
for i in range(0, self.count()):
self.setItemIcon(i, QIcon(":/forbidden.png"))
self.setStyleSheet(
("QComboBox {background: transparent}" +
"QComboBox::drop-down {image: url(:/forbidden.png)}"))
def leaveEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
for i in range(0, self.count()):
self.setItemIcon(i, QIcon())
self.setStyleSheet(
"QComboBox::drop-down {background: transparent}")
#The widget should not gain focus by using the mouse wheel.
#This is accomplished by setting the focus policy to Qt.StrongFocus.
#The widget should only accept wheel events if it already has the
#focus. This is accomplished by reimplementing QWidget.wheelEvent
#within a QSpinBox subclass:
def wheelEvent(self, event):
if self.hasFocus() is False:
event.ignore()
else:
QComboBox.wheelEvent(self, event)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
'''Triggered by monitor signal'''
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
class CAQMessageButton(QPushButton, PVGateway):
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
notify_freq_hz: int = 0,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, msg_label: str = "",
msg_press_value=None, msg_release_value=None,
start_monitor=False):
super().__init__(parent=parent, pv_name=pv_name,
monitor_callback=monitor_callback,
notify_freq_hz=notify_freq_hz,
pv_within_daq_group=pv_within_daq_group,
color_mode=color_mode, show_units=show_units,
msg_label=msg_label,
connect_callback=self.py_connect_callback)
self.msg_press_value = msg_press_value
self.msg_release_value = msg_release_value
if self.msg_press_value is not None:
self.pressed.connect(self.act_on_pressed)
if self.msg_release_value is not None:
self.released.connect(self.act_on_released)
self.msg_label = msg_label
self.suggested_text = self.msg_label
_suggested_text_length = len(self.suggested_text)+3
self.suggested_text = self.suggested_text.rjust(_suggested_text_length,
"^")
self.configure_widget()
self.msg_press_status = self.cyca.ICAFE_NORMAL
self.msg_release_status = self.cyca.ICAFE_NORMAL
self.msg_report_status = "PV={0}\n".format(self.pv_name)
self.msg_has_error = False
if not self.pv_within_daq_group and start_monitor:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.StrongFocus)
self.setCheckable(True) #Recognizes press and release states
fm = QFontMetricsF(QFont("Sans Serif", 10))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.0
self.setText(self.msg_label)
self.setFixedHeight(round(fm.lineSpacing()*1.8))
self.setFixedWidth(round(qrect.width() * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
def leaveEvent(self, event):
if self.property("readOnly"):
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
def mouseReleaseEvent(self, event):
if self.msg_release_value is not None:
time.sleep(0.1)
QPushButton.mouseReleaseEvent(self, event)
def mousePressEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 1:
QPushButton.mousePressEvent(self, event)
if event.button() == Qt.RightButton:
PVGateway.mousePressEvent(self, event)
def act_on_pressed(self):
if self.msg_press_value is not None:
self.msg_press_status = self.cafe.set(self.handle,
self.msg_press_value)
if self.msg_press_status != self.cyca.ICAFE_NORMAL:
self.msg_report_status += (
"Error in set operation (at press button):\n{0}\n".format(
self.cafe.getStatusCodeAsString(self.msg_press_status)))
self.msg_has_error = True
qm = QMessageBox()
qm.setText(self.msg_report_status)
qm.exec()
QApplication.processEvents()
def act_on_released(self):
if self.msg_release_value is not None:
self.msg_release_status = self.cafe.set(self.handle,
self.msg_release_value)
if self.msg_release_status != self.cyca.ICAFE_NORMAL:
self.msg_report_status += (
"Error in set operation (at release button):\n{0}\n".format(
self.cafe.getStatusCodeAsString(self.msg_release_status)))
self.msg_has_error = True
if self.msg_has_error:
self.msg_has_error = False
self.pv_message_in_a_box.setText(self.msg_report_status)
self.pv_message_in_a_box.exec()
self.msg_report_status = "PV={0}\n".format(self.pv_name)
qm = QMessageBox()
qm.setText(self.msg_report_status)
qm.exec()
QApplication.processEvents()
class CAQTextEntry(QLineEdit, PVGateway):
'''Channel access enabled QTextEntry widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str="", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, prefix: str="", suffix: str="",
precision: int=0):
super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
connect_callback=self.py_connect_callback,
precision=precision)
self.is_initialize_complete() #waits a fraction of a second
self.currentText = ""
self.returnPressed.connect(self.valuechange)
self.configure_widget()
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of
pv connection status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
#print ("FONT-1", self.font().pixelSize(), value)
#print ("FONT-1", self.font().pointSize(), value)
#self.setFont(QFont("Sans Serif", 8))
#QLineEdit.setFont(self, QFont("Sans Serif", 8))
#self.font().setPixelSize(12)
#QLineEdit.font().setPixelSize(8)
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
#self.font().setPixelSize(12)
#print ("FONT-3", self.font().pixelSize(), value)
#print ("FONT-3", self.font().pointSize(), value)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.setFocusPolicy(Qt.StrongFocus)
#self.setFont(QFont("Sans Serif", 12))
#QLineEdit.setFont(self, QFont("Sans Serif", 8))
#f = QFont("Sans Serif")
#f.setPixelSize(11)
#self.setFont(f)
#fm = QFontMetricsF(f)
#As for CAQLabel
QLineEdit.setFont(self, QFont("Sans Serif", 12))
fm = QFontMetricsF(QFont("Sans Serif", 9))
qrect = fm.boundingRect(self.suggested_text)
_width_scaling_factor = 1.2
self.setFixedHeight(round(fm.lineSpacing()*1.8))
self.setFixedWidth(round(qrect.width()+20 * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
def valuechange(self):
status = self.cafe.set(self.handle, self.text())
if status != self.cyca.ICAFE_NORMAL:
if self.cafe.getNoMonitors(self.handle) == 0:
val = self.cafe.get(self.handle, 'native')
else:
val = self.cafe.getCache(self.handle, 'native')
if val is not None:
if isinstance(val, str):
strText = val
else:
valStr = ("{: .%sf}" % self.precision)
strText = valStr.format(round(val, self.precision))
#print(strText, " precision ", self.precision)
self.setText(strText)
else:
#Do this for TextInfo cache
if self.cafe.getNoMonitors(self.handle) == 0:
val = self.cafe.get(self.handle, 'native')
def setText(self, value):
QLineEdit.setText(self, value)
self.currentText = self.text()
#print ("FONT-2", self.font().pixelSize(), value)
#print ("FONT-2", self.font().pointSize(), value)
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
self.setReadOnly(True)
self.setFocusPolicy(Qt.StrongFocus)
def leaveEvent(self, event):
if self.isReadOnly():
self.setReadOnly(False)
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
if self.text() != self.currentText:
QLineEdit.setText(self, self.currentText)
self.setCursorPosition(100)
self.clearFocus()
self.setFocusPolicy(Qt.NoFocus)
del event
def mousePressEvent(self, event):
if event.button() == Qt.RightButton:
PVGateway.mousePressEvent(self, event)
self.clearFocus()
return
local_event_position = QPoint(event.x(), event.y())
local_cursor_position = self.cursorPositionAt(local_event_position)
self.setCursorPosition(local_cursor_position)
class CAQSpinBox(QSpinBox, PVGateway):
'''Channel access enabled QSpinBox widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units=False, prefix: str = "", suffix: str = ""):
super().__init__(parent, pv_name, monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
connect_callback=self.py_connect_callback)
self.is_initialize_complete()
self.valueChanged.connect(self.value_change)
self.configure_widget()
if not self.pv_within_daq_group:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''
Callback function to be invoked on change of pv connection
status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.previousValue = None
self.currentValue = None
self.setFocusPolicy(Qt.StrongFocus)
self.setButtonSymbols(QAbstractSpinBox.UpDownArrows)
self.setAccelerated(False)
self.setLineEdit(QLineEditExtended(self))
self.lineEdit().setEnabled(True)
self.lineEdit().setReadOnly(False)
self.lineEdit().setAlignment(Qt.AlignLeft)
self.lineEdit().setFont(QFont("Sans Serif", 16))
fm = QFontMetricsF(QFont("Sans Serif", 12))
_suggested_text = self.max_control_abs_str
_added_text = ""
if self.show_units:
_added_text += " " + self.units
_suggested_text += self.units
if self.suffix:
_added_text += " " + self.suffix
_suggested_text += self.suffix
self.setSuffix(_added_text)
qrect = fm.boundingRect(_suggested_text)
_width_scaling_factor = 1.0
self.setFixedHeight(int(fm.lineSpacing()*1.8))
self.setFixedWidth(int(qrect.width() * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
if self.pv_ctrl is not None:
self.setRange(int(self.pv_ctrl.lowerControlLimit),
int(self.pv_ctrl.upperControlLimit))
def post_display_value(self, value):
'''Convert value to index'''
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
self.setValue(int(round(value)))
self.blockSignals(False)
else:
self.setValue(int(round(value)))
def mousePressEvent(self, event):
_opt = QStyleOptionSpinBox()
self.initStyleOption(_opt)
_rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxUp, self)
_rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxDown, self)
self.previousValue = self.value()
if event.button() == Qt.LeftButton:
if _rect_up.contains(event.pos(), proper=True) or \
_rect_down.contains(event.pos(), proper=True):
if not self.cafe.isConnected(self.handle):
self.pv_message_in_a_box.setText(
("Spinbox change value events currently suspended\n" +
"as channel {0} is disconnected.").format(
self.pv_name))
self.pv_message_in_a_box.exec()
return
QSpinBox.mousePressEvent(self, event)
#Clear Focus: only one step per mouse click.
self.clearFocus()
local_event_position = QPoint(event.x(), event.y())
local_cursor_position = self.lineEdit().cursorPositionAt(
local_event_position)
self.lineEdit().setCursorPosition(local_cursor_position)
PVGateway.mousePressEvent(self, event)
def setValue(self, intVal):
QSpinBox.setValue(self, intVal)
self.currentValue = self.value()
def value_change(self, intVal):
status = self.cafe.set(self.handle, intVal)
if status != self.cyca.ICAFE_NORMAL:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'int')
if _value is not None:
self.setValue(_value)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
self.pv_message_in_a_box.setText(
("Spinbox set operation reports error:\n{0}"
.format(self.cafe.getStatusCodeAsString(status))))
self.pv_message_in_a_box.exec()
else:
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'int')
if _value is not None:
self.setValue(_value)
self.parent.statusbar.showMessage(
(self.widget_class + " " +
self.cafe.getStatusCodeAsString(status)))
def enterEvent(self, event):
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
self.setReadOnly(True)
self.setFocusPolicy(Qt.StrongFocus)
def leaveEvent(self, event):
if self.isReadOnly():
self.setReadOnly(False)
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
self.clearFocus()
self.setFocusPolicy(Qt.NoFocus)
del event
def keyPressEvent(self, event):
if event.key() in (Qt.Key_Return, Qt.Key_Enter):
QSpinBox.keyPressEvent(self, event)
self.clearFocus()
elif event.key() in (Qt.Key_Up, Qt.Key_Down):
QSpinBox.keyPressEvent(self, event)
else:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
QSpinBox.keyPressEvent(self, event)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
# The spin box should not gain focus by using the mouse wheel.
# This is accomplished by setting the focus policy to Qt.StrongFocus.
# The spin box should only accept wheel events if it already has the focus.
# This is accomplished by reimplementing QWidget.wheelEvent within a
# QSpinBox subclass:
def wheelEvent(self, event):
#print("wheelEvent", self.hasFocus())
if self.hasFocus() is False:
event.ignore()
else:
QSpinBox.wheelEvent(self, event)
class CAQDoubleSpinBox(QDoubleSpinBox, PVGateway):
'''Channel access enabled QDoubleSpinBox widget'''
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int)
trigger_connect = Signal(int, str, int)
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = ""):
super().__init__(parent=parent, pv_name=pv_name,
monitor_callback=monitor_callback,
pv_within_daq_group=pv_within_daq_group,
color_mode=color_mode, show_units=show_units,
prefix=prefix, suffix=suffix,
connect_callback=self.py_connect_callback)
self.is_initialize_complete()
self.valueChanged.connect(self.valuechange)
self.configure_widget()
if self.pv_within_daq_group is False:
self.monitor_start()
def py_connect_callback(self, handle, pvname, status):
'''
Callback function to be invoked on change of pv connection
status.
'''
self.trigger_connect.emit(int(handle), str(pvname), int(status))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
PVGateway.receive_monitor_update(self, value, status, alarm_severity)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
PVGateway.receive_connect_update(self, handle, pv_name, status)
def configure_widget(self):
self.previousValue = None
self.currentValue = None
self.setFocusPolicy(Qt.StrongFocus)
self.setButtonSymbols(QAbstractSpinBox.UpDownArrows)
self.setAccelerated(False)
self.setLineEdit(QLineEditExtended(self))
self.lineEdit().setReadOnly(False)
self.lineEdit().setAlignment(Qt.AlignRight)
self.lineEdit().setFont(QFont("Sans Serif", 12))
_stepsize = 10**(self.precision * -1)
self.setSingleStep(_stepsize)
self.setDecimals(self.precision)
fm = QFontMetricsF(QFont("Sans Serif", 12))
_suggested_text = self.suggested_text
_added_text = ""
if self.show_units:
_added_text += " " + self.units
_suggested_text += self.units
if self.suffix:
_added_text += " " + self.suffix
_suggested_text += self.suffix
self.setSuffix(_added_text)
qrect = fm.boundingRect(_suggested_text)
_width_scaling_factor = 1.15
self.setFixedHeight(int(fm.lineSpacing()*1.8))
self.setFixedWidth(int(qrect.width() * _width_scaling_factor))
self.qt_property_initial_values(qt_object_name=self.PV_CONTROLLER)
if self.pv_ctrl is not None:
self.setRange(int(self.pv_ctrl.lowerControlLimit),
int(self.pv_ctrl.upperControlLimit))
def post_display_value(self, value):
'''set value from monitor'''
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
self.setValue(value)
self.blockSignals(False)
else:
self.setValue(value)
def mousePressEvent(self, event):
_opt = QStyleOptionSpinBox()
self.initStyleOption(_opt)
_rect_up = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxUp, self)
_rect_down = self.style().subControlRect(QStyle.CC_SpinBox, _opt,
QStyle.SC_SpinBoxDown, self)
self.previousValue = self.value()
if event.button() == Qt.LeftButton:
if _rect_up.contains(event.pos(), proper=False) or \
_rect_down.contains(event.pos(), proper=False):
if not self.cafe.isConnected(self.handle):
self.pv_message_in_a_box.setText(
("Spinbox change value events currently suspended\n" +
"as channel {0} is disconnected.").format(
self.pv_name))
self.pv_message_in_a_box.exec()
return
QDoubleSpinBox.mousePressEvent(self, event)
local_event_position = QPoint(event.x(), event.y())
local_cursor_position = self.lineEdit().cursorPositionAt(
local_event_position)
self.lineEdit().setCursorPosition(local_cursor_position)
PVGateway.mousePressEvent(self, event)
def mouseReleaseEvent(self, event):
self.clearFocus()
self.lineEdit().clearFocus()
def setValue(self, value):
self.currentValue = self.value()
QDoubleSpinBox.setValue(self, value)
def valuechange(self, fval):
status = self.cafe.set(self.handle, fval)
if status != self.cyca.ICAFE_NORMAL:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'float')
if _value is not None:
self.setValue(_value)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
self.pv_message_in_a_box.setText(
("Spinbox set operation reports error:\n{0}"
.format(self.cafe.getStatusCodeAsString(status))))
self.pv_message_in_a_box.exec()
else:
if self.previousValue is not None:
self.setValue(self.previousValue)
else:
_value = self.cafe.getCache(self.handle, 'float')
if _value is not None:
self.setValue(_value)
self.parent.statusbar.showMessage(
(self.widget_class + " " +
self.cafe.getStatusCodeAsString(status)))
def enterEvent(self, event):
self.setFocusPolicy(Qt.StrongFocus)
if self.pv_info is not None:
if self.pv_info.accessWrite == 0:
self.setProperty("readOnly", True)
self.qt_style_polish()
self.setReadOnly(True)
def leaveEvent(self, event):
if self.isReadOnly():
self.setReadOnly(False)
self.setProperty(self.qt_dynamic_property_get(), True)
self.qt_style_polish()
self.clearFocus()
self.lineEdit().clearFocus()
self.lineEdit().setFocusPolicy(Qt.NoFocus)
self.setFocusPolicy(Qt.NoFocus)
del event
def keyPressEvent(self, event):
if event.key() in (Qt.Key_Return, Qt.Key_Enter):
QDoubleSpinBox.keyPressEvent(self, event)
self.clearFocus()
self.lineEdit().clearFocus()
self.lineEdit().setFocusPolicy(Qt.NoFocus)
self.setFocusPolicy(Qt.NoFocus)
elif event.key() in (Qt.Key_Up, Qt.Key_Down):
QDoubleSpinBox.keyPressEvent(self, event)
else:
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
QDoubleSpinBox.keyPressEvent(self, event)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(False)
# The spin box should not gain focus by using the mouse wheel.
# This is accomplished by setting the focus policy to Qt.StrongFocus.
# The spin box should only accept wheel events if it already has the focus.
# This is accomplished by reimplementing QWidget.wheelEvent within a
# QSpinBox subclass:
def wheelEvent(self, event):
if self.hasFocus() is False:
event.ignore()
else:
QDoubleSpinBox.wheelEvent(self, event)
class reconnectQPushButton(QPushButton, QThread):
def __init__(self, parent=None):
super().__init__()
self.parent = parent
self.clicked.connect(self.onClicked)
self.isdirty = False
self._handles_to_reconnect = []
self.reconnectThread = None
def onClicked(self, event):
self._handles_to_reconnect = []
for i in range(0, len(self.parent.pv_gateway)):
if self.parent.item(
i, self.parent.no_columns-1).checkState() == Qt.Checked:
self._handles_to_reconnect.append(
self.parent.pv_gateway[i].handle)
self.reconnect()
QApplication.processEvents()
def reconnect(self):
QApplication.processEvents()
self.isdirty = True
if self._handles_to_reconnect:
self.parent.cafe.reconnect(self._handles_to_reconnect)
self.isdirty = False
#Uncheck reconnected channels
for i in range(0, len(self.parent.pv_gateway)):
if self.parent.item(
i, self.parent.no_columns-1).checkState() == Qt.Checked:
if self.parent.cafe.isConnected(
self.parent.pv_gateway[i].handle):
self.parent.item(
i, self.parent.no_columns-1).setCheckState(False)
else:
#Even if not connected - uncheck box as action is complete
self.parent.item(
i, self.parent.no_columns-1).setCheckState(False)
#Uncheck global reconnect check box
self.parent.cb_item_all.setCheckState(Qt.Unchecked)
class CAQTableWidget(QTableWidget):
'''Channel access enabled QTableWidget widget'''
#trigger_monitor_float = Signal(float, int, int)
#trigger_monitor_int = Signal(int, int, int)
#trigger_monitor_str = Signal(str, int, int)
#trigger_connect = Signal(int, str, int)
def hasNewData(self, _row, pv_data):
if self.pv_gateway[_row].pvd_previous is None:
return True
newDataFlag = False
if self.pv_gateway[_row].pvd_previous.ts[1] != pv_data.ts[1]:
newDataFlag = True
elif self.pv_gateway[_row].pvd_previous.ts[0] != pv_data.ts[0]:
newDataFlag = True
# Catch disconnect events(!!) and set newDataFlag only
elif self.pv_gateway[_row].pvd_previous.status != pv_data.status:
newDataFlag = True
return newDataFlag
def paint_rows(self, row_range: list=[], reset=False, last_row=[" ", " "],
columns=[0]):
_qcolor_last_line = QColor("#d1e8e9")
self.font_pts8 = QTableWidgetItem().font()
self.font_pts8.setPointSize(8)
if not row_range:
row_range = [0, self.rowCount()-1]
if reset:
_qcolor = self.item(0, self.columnCount()-1).background()
_start = 0
_end = self.rowCount()-1
else:
_qcolor = _qcolor_last_line
_start = row_range[0]
_end = row_range[1]
for _row in range(_start, _end):
#if not reset:
#_cell.setFont(self.font_pts8)
if 1 in columns:
self.item(_row, 0).setBackground(_qcolor)
self.item(_row, 0).setFont(self.font_pts8)
if 0 in columns:
_cell = QTableWidgetItem("{0}".format(_row+1))
#if not reset:
_cell.setFont(self.font_pts8)
#_cell.setTextAlignment(Qt.AlignCenter)
self.setVerticalHeaderItem(_row, _cell)
#last row
if reset and 0 in columns:
_cell = QTableWidgetItem("{0}".format(last_row[0]))
_cell.setFont(self.font_pts8)
self.setVerticalHeaderItem(self.rowCount()-1, _cell)
self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter)
self.item(self.rowCount()-1, 0).setText(str(last_row[1]))
self.item(self.rowCount()-1, 0).setBackground(_qcolor)
self.item(self.rowCount()-1, 0).setFont(self.font_pts8)
elif last_row[0] != " ":
_cell = QTableWidgetItem("{0}".format(last_row[0]))
_cell.setBackground(_qcolor_last_line)
_cell.setFont(self.font_pts8)
self.setVerticalHeaderItem(self.rowCount()-1, _cell)
if columns:
self.item(self.rowCount()-1, 0).setTextAlignment(Qt.AlignCenter)
self.item(self.rowCount()-1, 0).setText(str(last_row[1]))
self.item(self.rowCount()-1, 0).setBackground(_qcolor_last_line)
self.item(self.rowCount()-1, 0).setFont(self.font_pts8)
def widget_update(self):
""" Called when self.notif_unison is True """
for _row, pvgate in enumerate(self.pv_gateway):
if not pvgate.notify_unison:
continue
_handle = pvgate.handle
_pvd = pvgate.cafe.getPVCache(_handle)
if _pvd.status in (self.cyca.ICAFE_CS_NEVER_CONN,
self.cyca.ICAFE_CA_OP_CONN_DOWN):
pvgate.pvd_previous = _pvd
continue
pvgate.pvd_previous = _pvd
#if timestamps the same - then skip
_value = _pvd.value[0]
if self.scale_factor != 1:
_value = _value * self.scale_factor
_value = pvgate.format_display_value(_value)
qtwi = QTableWidgetItem(str(_value)+ " ")
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
val_col_no = self.columns_dict['Value']
if self.show_timestamp:
ts_col_no = self.columns_dict['Timestamp']
self.setItem(_row, val_col_no,
QTableWidgetItem(qtwi))
self.item(_row, val_col_no).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter))
_ts_date = _pvd.tsDateAsString
_ts_str_len = len(_ts_date)
_ilength_target = self.format_ts_nano
while _ts_str_len < _ilength_target:
_ts_date += "0"
_ilength_target = _ilength_target - 1
_ts_str_len = len(_ts_date)
_ts_str = _ts_date[0: _ts_str_len - (self.format_ts_nano -
self.format_ts_decimal_part)]
_ts_str_len = len(_ts_str)
_ilength_target = self.format_ts_decimal_part
if self.format_ts_decimal_part == self.format_ts_deci:
if _ts_str_len == self.format_ts_sec:
_ts_str += "."
while _ts_str_len < _ilength_target:
_ts_str += "0"
_ilength_target = _ilength_target -1
qtwi = QTableWidgetItem(_ts_str)
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
if self.show_timestamp:
self.setItem(_row, ts_col_no, QTableWidgetItem(qtwi))
self.item(_row, ts_col_no).setTextAlignment(Qt.AlignCenter)
_prop = pvgate.qt_dynamic_property_get()
alarm_severity = _pvd.alarmSeverity
if _prop == pvgate.READBACK_ALARM:
if alarm_severity == pvgate.cyca.SEV_MAJOR:
_bgcolor = pvgate.fg_alarm_major
_fgcolor = "black"
elif alarm_severity == pvgate.cyca.SEV_MINOR:
_bgcolor = pvgate.fg_alarm_minor
_fgcolor = "black"
elif alarm_severity == pvgate.cyca.SEV_INVALID:
_bgcolor = pvgate.fg_alarm_invalid
_fgcolor = "#777777"
else:
_bgcolor = pvgate.fg_alarm_noalarm
_fgcolor = "black"
#Colors for bg/fg reversed as is the old norm
self.item(_row, val_col_no).setBackground(QColor(_bgcolor))
self.item(_row, val_col_no).setForeground(QColor(_fgcolor))
if self.show_timestamp:
self.item(_row, ts_col_no).setBackground(QColor(_bgcolor))
self.item(_row, ts_col_no).setForeground(QColor(_fgcolor))
elif _prop == pvgate.READBACK_STATIC:
self.item(_row, val_col_no).setBackground(
QColor(pvgate.bg_readback))
#self.item(_row, val_col_no).setBackground(
# QColor('#aeae66'))
if self.show_timestamp:
self.item(_row, ts_col_no).setBackground(
QColor(pvgate.bg_readback))
elif _prop == pvgate.DISCONNECTED:
self.item(_row, val_col_no).setBackground(
QColor("#ffffff"))
self.item(_row, val_col_no).setForeground(
QColor("#777777"))
if self.show_timestamp:
self.item(_row, ts_col_no).setBackground(QColor("#ffffff"))
self.item(_row, ts_col_no).setForeground(QColor("#777777"))
else:
print(_prop, "widget_update unknown in element/row", _row,
_row+1)
QApplication.processEvents()
def __init__(self, parent=None, pv_list: list = ["PV_NAME_NOT_GIVEN"],
monitor_callback=None, pv_within_daq_group: bool = False,
color_mode=None, show_units: bool = True, prefix: str = "",
suffix: str = "", ts_res: str = "milli",
init_column: bool = False, init_list: list = [],
standby_column: bool = False, standby_list: list = [],
standby_values: list = [], set_delay: float = 0.0,
notify_freq_hz: int = 0, notify_unison: bool = True,
precision: int = 0, scale_factor: float = 1,
show_timestamp: bool = True, pv_list_show: list = None):
super().__init__()
self.columns_dict = {}
_column_dict_value = 0
if pv_list_show is not None:
if pv_list_show[0]:
self.columns_dict['PV'] = _column_dict_value
_column_dict_value += 1
else:
self.columns_dict['PV'] = _column_dict_value
_column_dict_value += 1
if init_column:
self.columns_dict['Init'] = _column_dict_value
_column_dict_value += 1
if standby_column:
self.columns_dict['Standby'] = _column_dict_value
_column_dict_value += 1
self.columns_dict['Value'] = _column_dict_value
if show_timestamp:
_column_dict_value += 1
self.columns_dict['Timestamp'] = _column_dict_value
_column_dict_value += 1
self.columns_dict['Reconnect'] = _column_dict_value
self.setWindowModality(Qt.ApplicationModal)
self.no_columns = _column_dict_value + 1
self.init_column = init_column
self.init_list = init_list
if self.init_column and not self.init_list:
self.init_list = pv_list
self.standby_column = standby_column
self.standby_list = standby_list
if self.standby_column and not self.standby_list:
self.standby_list = pv_list
self.standby_values = standby_values
self.set_delay = set_delay
self.icount = 0
self.notify_freq_hz = abs(notify_freq_hz)
self.notify_freq_hz_default = self.notify_freq_hz
self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \
int(1000 / self.notify_freq_hz)
self.notify_unison = bool(notify_unison) and bool(self.notify_freq_hz)
self.precision = precision
self.scale_factor = scale_factor
self.show_timestamp = show_timestamp
self.format_ts_nano = 31 #max length of date
self.format_ts_micro = 28
self.format_ts_milli = 25
self.format_ts_deci = 23 #-8
self.format_ts_sec = 21
if "nano" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_nano
elif "micro" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_micro
elif "milli" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_milli
elif "deci" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_deci
elif "sec" in ts_res.lower():
self.format_ts_decimal_part = self.format_ts_sec
else:
self.format_ts_decimal_part = self.format_ts_milli
self.pv2item_dict = {}
self.pv_list = pv_list
self.pv_gateway = [None] * len(self.pv_list)
self.pv_list_show = pv_list_show
if self.pv_list_show is None:
self.pv_list_show = self.pv_list
_color_mode = [None] * len(self.pv_list)
if color_mode is not None:
if isinstance(color_mode, list):
for i in range(0, len(color_mode)):
_color_mode[i] = color_mode[i]
else:
for i in range(0, len(_color_mode)):
_color_mode[i] = color_mode
for i in range(0, len(self.pv_list)):
self.pv_gateway[i] = PVGateway(
parent, self.pv_list[i], monitor_callback,
pv_within_daq_group, _color_mode[i], show_units, prefix, suffix,
connect_triggers=False, notify_freq_hz=self.notify_freq_hz,
notify_unison=self.notify_unison, precision=self.precision)
self.pv_gateway[i].is_initialize_complete()
self.pv_gateway[i].trigger_connect.connect(
self.receive_connect_update)
self.pv_gateway[i].trigger_monitor_str.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_int.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_float.connect(
self.receive_monitor_update)
self.pv_gateway[i].widget_class = "QTableWidgetItem"
self.pv_gateway[i].qt_property_initial_values(
qt_object_name=self.pv_gateway[i].PV_READBACK, tool_tip=False)
#required for reconnect
self.cafe = self.pv_gateway[0].cafe
self.cyca = self.pv_gateway[0].cyca
self.timer = None
if self.notify_unison:
self.timer = QTimer()
self.timer.timeout.connect(self.widget_update)
self.timer.singleShot(0, self.widget_update)
self.timer.start(self.notify_milliseconds)
self.reconnect_button = None
self.configure_widget()
#Connect only deals with colours - only helps on reconnect
# In any case monitors take over
#Got to do this earlier or emit immediately after!!
for i in range(0, len(self.pv_gateway)):
if self.cafe.isConnected(self.pv_gateway[i].pv_name):
self.pv_gateway[i].trigger_connect.emit(
self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name),
self.pv_gateway[i].cyca.ICAFE_CS_CONN)
for i in range(0, len(self.pv_gateway)):
if not self.pv_gateway[i].pv_within_daq_group:
self.pv_gateway[i].monitor_start()
if init_column:
self.update_init_values()
if standby_column:
self.update_standby_values()
self.configure_context_menu()
def configure_context_menu(self):
self.table_context_menu = QMenu()
self.table_context_menu.setObjectName("contextMenu")
self.table_context_menu.setWindowModality(Qt.NonModal)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.table_context_menu.addSection("---")
action1 = QAction("Configure Table PVs", self)
action1.triggered.connect(self.display_table_parameters)
self.table_context_menu.addAction(action1)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.table_context_menu.addSection("---")
QApplication.processEvents()
def set_standby_values(self, pv_list: list = []):
if self.init_column:
self.init_value_button.setEnabled(False)
self.restore_value_button.setEnabled(False)
_text = self.standby_value_button.text()
self.standby_value_button.setText("Downing")
self.standby_value_button.setEnabled(False)
if self.reconnect_button is not None:
self.reconnect_button.setEnabled(False)
_set_values_dict = self.get_standby_values()
#return 1,2,3
if not pv_list:
_pvs_to_set, _values_to_set = zip(*_set_values_dict.items())
#zip returns tuples
_pvs_to_set = list(_pvs_to_set)
_values_to_set = list(_values_to_set)
else:
_pvs_to_set = []
_values_to_set = []
for pv in pv_list:
if pv in _set_values_dict.keys():
_pvs_to_set.append(pv)
_values_to_set.append(_set_values_dict[pv])
#self.standby_value_button.setEnabled(False)
#QApplication.processEvents(QEventLoop.AllEvents, 1.0)
if self.set_delay <= 0:
status, status_list = self.cafe.setScalarList(_pvs_to_set,
_values_to_set)
else:
status_list = [self.cyca.ICAFE_NORMAL] * len(_pvs_to_set)
status = self.cyca.ICAFE_NORMAL
for i, (pv, val) in enumerate(zip(_pvs_to_set, _values_to_set)):
status_list[i] = self.cafe.set(pv, val)
if status_list[i] != self.cyca.ICAFE_NORMAL:
status = status_list[i]
##self.standby_value_button.setEnabled(False)
##QApplication.processEvents(QEventLoop.AllEvents, 1.0)
time.sleep(self.set_delay)
##self.standby_value_button.setEnabled(False)
#QApplication.sendPostedEvents()
QApplication.processEvents(QEventLoop.AllEvents, 1)
if status != self.cyca.ICAFE_NORMAL:
_mess = ("The following devices reported an error " +
"in 'set' operation:")
for i, status_value in enumerate(status_list):
if status_value != self.cyca.ICAFE_NORMAL:
_mess += ("\n" + _pvs_to_set[i] + " has status = " +
str(status_value) + " " +
self.cafe.getStatusCodeAsString(status_value)
+ " " + self.cafe.getStatusInfo(status_value))
qm = QMessageBox()
qm.setText(_mess)
#qm.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
#qm.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
#qm.setFixedWidth(800)
##self.standby_value_button.setEnabled(False)
##QApplication.sendPostedEvents()
qm.exec()
#QApplication.processEvents()
if self.init_column:
self.init_value_button.setEnabled(True)
self.restore_value_button.setEnabled(True)
self.standby_value_button.setText(_text)
self.standby_value_button.setEnabled(True)
if self.reconnect_button is not None:
self.reconnect_button.setEnabled(True)
return status, status_list, _pvs_to_set
def restore_init_values(self, pv_list: list = []):
_text = self.restore_value_button.text()
self.restore_value_button.setText("Restoring")
self.restore_value_button.setEnabled(False)
if self.init_column:
self.init_value_button.setEnabled(False)
if self.standby_column:
self.standby_value_button.setEnabled(False)
if self.reconnect_button is not None:
self.reconnect_button.setEnabled(False)
_set_values_dict = self.get_init_values()
if not pv_list:
_pvs_to_set, _values_to_set = zip(*_set_values_dict.items())
#zip returns tuples
_pvs_to_set = list(_pvs_to_set)
_values_to_set = list(_values_to_set)
else:
_pvs_to_set = []
_values_to_set = []
for pv in pv_list:
if pv in _set_values_dict.keys():
_pvs_to_set.append(pv)
_values_to_set.append(_set_values_dict[pv])
if self.set_delay <= 0:
status, status_list = self.cafe.setScalarList(_pvs_to_set,
_values_to_set)
else:
status_list = [self.cyca.ICAFE_NORMAL] * len(_pvs_to_set)
status = self.cyca.ICAFE_NORMAL
for i, (pv, val) in enumerate(zip(_pvs_to_set, _values_to_set)):
status_list[i] = self.cafe.set(pv, val)
if status_list[i] != self.cyca.ICAFE_NORMAL:
status = status_list[i]
time.sleep(self.set_delay)
QApplication.processEvents(QEventLoop.AllEvents, 1)
if status != self.cyca.ICAFE_NORMAL:
_mess = ("The following device(s) reported an error " +
"in 'set' operation:")
for i, status_value in enumerate(status_list):
if status_value != self.cyca.ICAFE_NORMAL:
_mess += ("\n" + _pvs_to_set[i] + " has status = " +
str(status_value) + " " +
self.cafe.getStatusCodeAsString(status_value) +
" " + self.cafe.getStatusInfo(status_value))
qm = QMessageBox()
qm.setText(_mess)
qm.exec()
QApplication.processEvents()
if self.init_column:
self.init_value_button.setEnabled(True)
self.restore_value_button.setText(_text)
self.restore_value_button.setEnabled(True)
if self.standby_column:
self.standby_value_button.setEnabled(True)
if self.reconnect_button is not None:
self.reconnect_button.setEnabled(True)
return status, status_list, _pvs_to_set
def is_same_as_init_values(self):
_init_values_dict = self.get_column_values(self.columns_dict['Init'])
_pvs, _init_values = zip(*_init_values_dict.items())
_current_values_dict = self.get_column_values(
self.columns_dict['Value'])
_pvs, _current_values = zip(*_current_values_dict.items())
#zip returns tuples
return bool(func_reduce(lambda i, j: i and j, map(
lambda m, k: m == k, _init_values, _current_values), True))
#if func_reduce(lambda i, j: i and j, map(
# lambda m, k: m == k, _init_values, _current_values), True):
# return True
#else:
# return False
def get_column_values(self, column_no):
_values_dict = {}
_start = 0
_end = len(self.pv_gateway)
_pvs = [None] * _end
_values_str = [None] * _end
_values = [None] * _end
for _row in range(_start, _end):
_values_str[_row] = self.item(_row, column_no).text()
_pvs[_row] = self.item(_row, 0).text()
_value_list = [float(_value_list) for _value_list in re.findall(
r'-?\d+\.?\d*', _values_str[_row])]
if not _value_list:
print("row", _row, "values", _values_str[_row], _pvs[_row])
_values[_row] = _values_str[_row] #Can be enum string
else:
_values[_row] = _value_list[0]
if _pvs[_row] in self.pv_list_show:
_values_dict[self.pv_gateway[_row].pv_name] = _values[_row]
else:
_values_dict[_row] = _values[_row]
return _values_dict #_pvs_to_set, _values_to_set
def get_init_values(self):
return self.get_column_values(self.columns_dict['Init'])
def get_standby_values(self):
return self.get_column_values(self.columns_dict['Standby'])
def get_init_values_previous(self):
_set_values_dict = {}
_start = 0
_end = len(self.pv_gateway)
_pvs_to_set = [None] * _end
_values_to_set_str = [None] * _end
_values_to_set = [None] * _end
for _row in range(_start, _end):
_values_to_set_str[_row] = self.item(
_row, self.columns_dict['Init']).text()
_pvs_to_set[_row] = self.item(_row, self.columns_dict['PV']).text()
_value_list = [float(_value_list) for _value_list in re.findall(
r'-?\d+\.?\d*', _values_to_set_str[_row])]
if not _value_list:
print("//row", _row, "values", _values_to_set_str[_row],
_pvs_to_set[_row])
_values_to_set[_row] = _values_to_set_str[_row] #Can be enum str
else:
_values_to_set[_row] = _value_list[0]
if _pvs_to_set[_row] in self.init_list:
_set_values_dict[
self.pv_gateway[_row].pv_name] = _values_to_set[_row]
return _set_values_dict
def update_standby_values(self):
_start = 0
_end = len(self.standby_values)
if 'Standby' in self.columns_dict:
_column_no = self.columns_dict['Standby']
else:
return
for _row in range(_start, _end):
_value = self.standby_values[_row]
if self.scale_factor != 1:
_value = _value * self.scale_factor
_value = self.pv_gateway[_row].format_display_value(_value)
qtwi = QTableWidgetItem(str(_value)+ " ")
_f = qtwi.font()
_f.setPointSize(8)
qtwi.setFont(_f)
self.setItem(_row, _column_no, qtwi)
self.item(_row, _column_no).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter))
def set_init_values(self, values):
_start = 0
_end = min(len(self.pv_gateway), len(values))
if 'Init' in self.columns_dict:
_column_no = self.columns_dict['Init']
else:
return
for _row in range(_start, _end):
_value = self.pv_gateway[_row].format_display_value(values[_row])
qtwi = QTableWidgetItem(str(_value)+ " ")
_f = qtwi.font()
_f.setPointSize(8)
qtwi.setFont(_f)
self.setItem(_row, _column_no, qtwi)
self.item(_row, _column_no).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter))
def update_init_values(self):
_start = 0
_end = len(self.pv_gateway)
if 'Init' in self.columns_dict:
_column_no = self.columns_dict['Init']
else:
return
for _row in range(_start, _end):
_handle = self.pv_gateway[_row].handle
_value = self.pv_gateway[_row].cafe.getCache(_handle)
if _value is not None:
if self.scale_factor != 1:
_value = _value * self.scale_factor
_value = self.pv_gateway[_row].format_display_value(_value)
qtwi = QTableWidgetItem(str(_value)+ " ")
_f = qtwi.font()
_f.setPointSize(8)
qtwi.setFont(_f)
self.setItem(_row, _column_no, qtwi)
self.item(_row, _column_no).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter))
def configure_widget(self):
_column_width_pvname = 180
_column_width_value = 90
_column_width_timestamp = 210
_column_width_checkbox = 22
item_font = QTableWidgetItem().font()
item_font.setPixelSize(13)
self.setRowCount(len(self.pv_gateway)+1)
self.setColumnCount(self.no_columns)
self.setEditTriggers(QAbstractItemView.NoEditTriggers)
self.resizeColumnsToContents()
self.resizeRowsToContents()
#self.horizontalHeader().setStretchLastSection(True);
if 'PV' in self.columns_dict.keys():
self.setColumnWidth(self.columns_dict['PV'], _column_width_pvname)
_pv_column = self.columns_dict['PV']
self.setColumnWidth(self.columns_dict['Value'], _column_width_value)
if 'Init' in self.columns_dict.keys():
self.setColumnWidth(self.columns_dict['Init'], _column_width_value)
if 'Timestamp' in self.columns_dict.keys():
self.setColumnWidth(self.columns_dict['Timestamp'],
_column_width_timestamp)
self.setColumnWidth(self.columns_dict['Reconnect'],
_column_width_checkbox)
for i in range(0, len(self.pv_gateway)):
istart = 1
if 'PV' in self.columns_dict.keys():
qtwt = QTableWidgetItem(self.pv_list_show[i])
f = qtwt.font()
f.setPointSize(8)
qtwt.setFont(f)
self.setItem(i, _pv_column, qtwt)
self.item(i, _pv_column).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignHCenter|Qt.AlignVCenter))
else:
istart = 0
for i_column in range(istart, self.no_columns-1):
self.setItem(i, i_column, QTableWidgetItem(str("")))
self.item(i, i_column).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignHCenter|Qt.AlignVCenter))
self.pv2item_dict[self.pv_gateway[i]] = i
cb_item = QTableWidgetItem()
cb_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
cb_item.setCheckState(Qt.Unchecked)
cb_item.setTextAlignment(Qt.AlignCenter)
cb_item.setToolTip(self.pv_gateway[i].pv_name)
self.setItem(i, self.no_columns-1, cb_item)
self.item(i, self.no_columns-1).setTextAlignment(Qt.AlignCenter)
if self.init_column:
self.init_widget = QWidget()
self.init_layout = QHBoxLayout(self.init_widget)
self.init_value_button = QPushButton()
self.init_value_button.setText("Update")
f = self.init_value_button.font()
f.setPointSize(8)
self.init_value_button.setFont(f)
self.init_value_button.setFixedWidth(64)
self.init_value_button.clicked.connect(self.update_init_values)
self.init_value_button.setToolTip(
("Stores initial, pre-measurement value. Update is also " +
"typically executed automatically before analysis procedure."))
self.init_layout.addWidget(self.init_value_button)
self.init_layout.setAlignment(Qt.AlignCenter)
self.init_layout.setContentsMargins(1, 1, 1, 0) #Required
self.init_widget.setLayout(self.init_layout)
if 'PV' in self.columns_dict:
self.setCellWidget(len(self.pv_gateway), 0, self.init_widget)
else:
self.setCellWidget(len(self.pv_gateway),
self.columns_dict['Init'], self.init_widget)
self.restore_widget = QWidget()
self.restore_layout = QHBoxLayout(self.restore_widget)
self.restore_value_button = QPushButton()
self.restore_value_button.setStyleSheet(
"QPushButton{background-color: rgb(212, 219, 157);}")
self.restore_value_button.setText("Restore")
f = self.restore_value_button.font()
f.setPointSize(8)
self.restore_value_button.setFont(f)
self.restore_value_button.setFixedWidth(80)
self.restore_value_button.clicked.connect(self.restore_init_values)
self.restore_value_button.setToolTip(
("Restore devices to their pre-measurement values"))
self.restore_layout.addWidget(self.restore_value_button)
self.restore_layout.setAlignment(Qt.AlignCenter)
self.restore_layout.setContentsMargins(1, 1, 0, 0)
self.restore_widget.setLayout(self.restore_layout)
if 'PV' in self.columns_dict:
self.setCellWidget(len(self.pv_gateway),
self.columns_dict['Init'],
self.restore_widget)
if self.standby_column:
_standby_widget = QWidget()
_standby_layout = QHBoxLayout(_standby_widget)
self.standby_value_button = QPushButton()
self.standby_value_button.setStyleSheet(
"QPushButton{background-color: rgb(212, 219, 157);}")
self.standby_value_button.setText("Standby")
f = self.standby_value_button.font()
f.setPointSize(8)
self.standby_value_button.setFont(f)
self.standby_value_button.setFixedWidth(80)
self.standby_value_button.clicked.connect(self.set_standby_values)
self.standby_value_button.setToolTip(
("Set devices to their pre-set standby values"))
_standby_layout.addWidget(self.standby_value_button)
_standby_layout.setAlignment(Qt.AlignCenter)
_standby_layout.setContentsMargins(1, 1, 0, 0)
_standby_widget.setLayout(_standby_layout)
self.setCellWidget(len(self.pv_gateway),
self.columns_dict['Standby'], _standby_widget)
#Do not display no for last row (Reconnect button)
_row_digit_last_cell = QTableWidgetItem(str(""))
self.setVerticalHeaderItem(len(self.pv_gateway), _row_digit_last_cell)
self.setItem(len(self.pv_gateway), 0, QTableWidgetItem(str("")))
_qwb = QWidget()
self.reconnect_button = reconnectQPushButton(self) #self required
f = self.reconnect_button.font()
if 'Timestamp' in self.columns_dict.keys():
f.setPixelSize(11) #previous 13
self.reconnect_button.setFixedWidth(100)
else:
f.setPixelSize(11) #6
self.reconnect_button.setFixedWidth(66) #58
self.reconnect_button.setFont(f)
self.reconnect_button.setText("Reconnect")
self.reconnect_button.setToolTip("Reconnect selected/checked channels")
_layout = QHBoxLayout(_qwb)
_layout.addWidget(self.reconnect_button)
_layout.setAlignment(Qt.AlignCenter)
_layout.setContentsMargins(0, 0, 0, 0) #Required
#_reconnect_button
self.setCellWidget(len(self.pv_gateway), self.no_columns-2, _qwb)
self.cb_item_all = QCheckBox()
self.cb_item_all.setCheckState(Qt.Unchecked)
self.cb_item_all.stateChanged.connect(self.reconnectStateChanged)
self.cb_item_all.setObjectName("Reconnect")
self.setCellWidget(len(self.pv_gateway), self.no_columns-1,
self.cb_item_all)
if 'PV' in self.columns_dict.keys():
header_item = QTableWidgetItem("Process Variable")
f = header_item.font()
f.setPixelSize(13)
header_item.setFont(f)
self.setHorizontalHeaderItem(self.columns_dict['PV'], header_item)
if 'Init' in self.columns_dict.keys():
header_item = QTableWidgetItem("Initial Value")
f = header_item.font()
f.setPixelSize(13)
header_item.setFont(f)
self.setHorizontalHeaderItem(self.columns_dict['Init'], header_item)
header_item = QTableWidgetItem("Value")
f = header_item.font()
f.setPixelSize(13)
header_item.setFont(f)
self.setHorizontalHeaderItem(self.columns_dict['Value'], header_item)
if 'Timestamp' in self.columns_dict.keys():
header_item = QTableWidgetItem("Timestamp")
f = header_item.font()
f.setPixelSize(13)
header_item.setFont(f)
self.setHorizontalHeaderItem(self.columns_dict['Timestamp'],
header_item)
header_item = QTableWidgetItem("R")
f = header_item.font()
f.setPixelSize(13)
header_item.setFont(f)
self.setHorizontalHeaderItem(self.columns_dict['Reconnect'],
header_item)
self.setFocusPolicy(Qt.NoFocus)
self.setEditTriggers(QAbstractItemView.NoEditTriggers)
self.setSelectionMode(QAbstractItemView.NoSelection)
self.verticalHeader().setDefaultAlignment(Qt.AlignRight)
self.verticalHeader().setFixedWidth(22)
_fm_font = QFont("Sans Serif")
_fm_font.setPointSize(12)
fm = QFontMetricsF(_fm_font)
_factor = 1
if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"):
_factor = 1.18
self.setFixedHeight(
int(fm.lineSpacing() * _factor * (len(self.pv_gateway)+3)))
_min_table_width = 620 if not self.init_column else 650
self.setMinimumWidth(_min_table_width)
for _row in range(0, len(self.pv_gateway)):
if 'PV' in self.columns_dict.keys():
self.item(_row, _pv_column).setForeground(QColor("#000000"))
istart = 1
else:
istart = 0
for i_column in range(istart, self.no_columns-2):
self.item(_row, i_column).setForeground(QColor("#000000"))
self.item(_row, i_column).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter))
self.item(_row, self.columns_dict['Value']).setBackground(
QColor("#ffffff"))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row,
self.columns_dict['Timestamp']).setTextAlignment(
Qt.AlignCenter)
self.item(_row,
self.columns_dict['Timestamp']).setBackground(
QColor("#ffffff"))
@Slot(int)
def reconnectStateChanged(self, state):
if state == Qt.Unchecked:
for i in range(0, len(self.pv_gateway)):
self.item(i, self.columns_dict['Reconnect']).setCheckState(
Qt.Unchecked)
else:
for i in range(0, len(self.pv_gateway)):
self.item(i, self.columns_dict['Reconnect']).setCheckState(
Qt.Checked)
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].time_monotonic = time.monotonic()
if self.scale_factor != 1:
value = value * self.scale_factor
_value = self.pv_gateway[_row].format_display_value(value)
#if 'QMD10' in self.pv_gateway[_row].pv_name:
# print(_value + " from widgets.py" + self.pv_gateway[_row].pv_name)
qtwi = QTableWidgetItem(str(_value) + " ")
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
qtwi.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled | Qt.ItemIsEditable)
#qtwi.setEditTriggers(QAbstractItemView.AllEditTriggers)
self.setItem(_row, self.columns_dict['Value'], qtwi)
#self.item(_row, self.columns_dict['Value']).setFlags(
#Qt.ItemIsSelectable | Qt.ItemIsEnabled | Qt.ItemIsEditable)
#self.item(_row, self.columns_dict['Value']).setEditTriggers(
#QAbstractItemView.AllEditTriggers)
self.item(_row, self.columns_dict['Value']).setTextAlignment(
Qt.AlignmentFlag(Qt.AlignRight|Qt.AlignVCenter))
if 'Timestamp' in self.columns_dict.keys():
_handle = self.pv_gateway[_row].handle
_pvd = self.pv_gateway[_row].cafe.getPVCache(_handle)
_ts_date = _pvd.tsDateAsString
_ts_str_len = len(_ts_date)
_ilength_target = self.format_ts_nano
while _ts_str_len < _ilength_target:
_ts_date += "0"
_ilength_target = _ilength_target -1
##ts_str_len = len(_ts_date)
_ts_str = _ts_date[0: _ts_str_len-(
self.format_ts_nano-self.format_ts_decimal_part)]
_ts_str_len = len(_ts_str)
_ilength_target = self.format_ts_decimal_part
if self.format_ts_decimal_part == self.format_ts_deci:
if _ts_str_len == self.format_ts_sec:
_ts_str += "."
while _ts_str_len < _ilength_target:
_ts_str += "0"
_ilength_target = _ilength_target -1
qtwi = QTableWidgetItem(_ts_str)
f = qtwi.font()
f.setPointSize(8)
qtwi.setFont(f)
self.setItem(_row, self.columns_dict['Timestamp'], qtwi)
self.item(_row, self.columns_dict['Timestamp']).setTextAlignment(
Qt.AlignCenter)
_prop = self.pv_gateway[_row].qt_dynamic_property_get()
if _prop == self.pv_gateway[_row].READBACK_ALARM:
if alarm_severity == self.pv_gateway[_row].cyca.SEV_MAJOR:
_bgcolor = self.pv_gateway[_row].fg_alarm_major
_fgcolor = "black"
elif alarm_severity == self.pv_gateway[_row].cyca.SEV_MINOR:
_bgcolor = self.pv_gateway[_row].fg_alarm_minor
_fgcolor = "black"
elif alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID:
_bgcolor = self.pv_gateway[_row].fg_alarm_invalid
_fgcolor = "#777777"
else:
_bgcolor = self.pv_gateway[_row].fg_alarm_noalarm
_fgcolor = "black"
#Colors for bg/fg reversed as is the old norm
self.item(_row, self.columns_dict['Value']).setBackground(
QColor(_bgcolor))
self.item(_row, self.columns_dict['Value']).setForeground(
QColor(_fgcolor))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor(_bgcolor))
self.item(_row, self.columns_dict['Timestamp']).setForeground(
QColor(_fgcolor))
elif _prop == self.pv_gateway[_row].DISCONNECTED or \
alarm_severity == self.pv_gateway[_row].cyca.SEV_INVALID:
self.item(_row, self.columns_dict['Value']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Value']).setForeground(
QColor("#777777"))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Timestamp']).setForeground(
QColor("#777777"))
elif _prop == self.pv_gateway[_row].READBACK_STATIC:
self.item(_row, self.columns_dict['Value']).setBackground(
QColor(self.pv_gateway[_row].bg_readback))
#self.item(_row, self.columns_dict['Value']).setBackground(
# QColor("#ffffe1"))
#self.item(_row, self.columns_dict['Value']).setFlags(
#Qt.ItemIsSelectable | Qt.ItemIsEnabled | Qt.ItemIsEditable)
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor(self.pv_gateway[_row].bg_readback))
else:
print(_prop, self.pv_gateway[_row].DISCONNECTED,
"(in monitor) unknown in element/row no.", _row, _row+1)
QApplication.processEvents(QEventLoop.AllEvents, 10)
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].receive_connect_update(handle, pv_name, status,
post_display=False)
_prop = self.pv_gateway[_row].qt_dynamic_property_get()
#self.post_display_value(status)
if _prop == self.pv_gateway[_row].DISCONNECTED:
self.item(_row, self.columns_dict['Value']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Value']).setForeground(
QColor("#777777"))
if 'Timestamp' in self.columns_dict.keys():
self.item(_row, self.columns_dict['Timestamp']).setBackground(
QColor("#ffffff"))
self.item(_row, self.columns_dict['Timestamp']).setForeground(
QColor("#777777"))
QApplication.processEvents()
def table_precision_user_changed(self, new_value):
self.pvgateway_precision = new_value
for pvgate in self.pv_gateway:
if pvgate.pv_ctrl is not None:
self.pvgateway_precision = min(pvgate.pv_ctrl.precision,
new_value)
pvgate.precision_user = self.pvgateway_precision
pvgate.precision = self.pvgateway_precision
_pvd = self.cafe.getPVCache(pvgate.handle)
if _pvd.value[0] is not None:
if isinstance(_pvd.value[0], float):
pvgate.trigger_monitor_float.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
def table_precision_ioc_reset(self):
if self.max_precision_value == self.table_precision_user_wgt.value():
self.table_precision_user_changed(self.max_precision_value)
else:
self.table_precision_user_wgt.setValue(self.max_precision_value)
def table_refresh_rate_changed(self, new_idx):
_notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx]
_notify_milliseconds = 0 if _notify_freq_hz == 0 else \
1000 / _notify_freq_hz
self.notify_freq_hz = _notify_freq_hz
if _notify_milliseconds == 0:
for pvgate in self.pv_gateway:
pvgate.notify_unison = False
pvgate.notify_milliseconds = _notify_milliseconds
pvgate.notify_freq_hz = self.notify_freq_hz
pvgate.monitor_stop()
time.sleep(0.01)
for pvgate in self.pv_gateway:
pvgate.monitor_start()
else:
for pvgate in self.pv_gateway:
if not pvgate.notify_unison:
pvgate.monitor_stop()
for pvgate in self.pv_gateway:
pvgate.notify_milliseconds = _notify_milliseconds
pvgate.notify_freq_hz = self.notify_freq_hz
if not pvgate.notify_unison:
pvgate.notify_unison = True
pvgate.monitor_start()
else:
self.cafe.updateMonitorPolicyDeltaMS(
pvgate.handle, pvgate.monitor_id,
pvgate.notify_milliseconds)
if self.timer is not None:
self.timer.stop()
else:
self.timer = QTimer()
self.timer.timeout.connect(self.widget_update)
self.timer.singleShot(0, self.widget_update)
if _notify_milliseconds > 0:
self.timer.start(_notify_milliseconds)
def table_ts_resolution_changed(self, new_idx):
for i, ts_res in enumerate(self.ts_combox_idx_dict.values()):
if i == new_idx:
self.format_ts_decimal_part = ts_res
break
for pvgate in self.pv_gateway:
_pvd = self.cafe.getPVCache(pvgate.handle)
if _pvd.value[0] is not None:
if isinstance(_pvd.value[0], float):
pvgate.trigger_monitor_float.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
elif isinstance(_pvd.value[0], int):
pvgate.trigger_monitor_int.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
else:
pvgate.trigger_monitor_str.emit(
str(_pvd.value[0]), _pvd.status, _pvd.alarmSeverity)
def display_table_parameters(self):
display_wgt = QDialog(self)
display_wgt.setWindowTitle("PV Parameters")
layout = QVBoxLayout()
common_label_width = 120
common_wgt_width = 160
common_hbox_width = common_label_width + common_wgt_width + 20
self.initial_value = 0
self.max_precision_value = 0
for i, pvgate in enumerate(self.pv_gateway):
if pvgate.pv_ctrl is not None:
if pvgate.pv_ctrl.precision > 0:
self.max_precision_value = max(self.max_precision_value,
pvgate.pv_ctrl.precision)
self.initial_value = max(self.initial_value,
pvgate.precision)
if self.max_precision_value > 0:
#precision user
_hbox_wgt = QWidget()
_hbox = QHBoxLayout()
precision_user_label = QLabel("Precision (user):")
self.table_precision_user_wgt = QSpinBox(self)
self.table_precision_user_wgt.setFocusPolicy(Qt.NoFocus)
self.table_precision_user_wgt.setValue(self.initial_value)
self.table_precision_user_wgt.setMaximum(self.max_precision_value)
self.table_precision_user_wgt.valueChanged.connect(
self.table_precision_user_changed)
precision_user_label.setAlignment(Qt.AlignLeft)
self.table_precision_user_wgt.setAlignment(Qt.AlignLeft)
_hbox.addWidget(precision_user_label)
_hbox.addWidget(self.table_precision_user_wgt)
_hbox.setAlignment(Qt.AlignLeft)
_hbox_wgt.setLayout(_hbox)
precision_user_label.setFixedWidth(common_label_width)
self.table_precision_user_wgt.setFixedWidth(40)
_hbox_wgt.setFixedWidth(common_hbox_width)
#precision ioc
_hbox2_wgt = QWidget()
_hbox2 = QHBoxLayout()
precision_ioc_label = QLabel("Precision (ioc): ")
precision_ioc = QPushButton(self)
precision_ioc.setText("Reset")
precision_ioc.clicked.connect(self.table_precision_ioc_reset)
precision_ioc_label.setAlignment(Qt.AlignLeft)
_hbox2.addWidget(precision_ioc_label)
_hbox2.addWidget(precision_ioc)
_hbox2.setAlignment(Qt.AlignLeft)
_hbox2_wgt.setLayout(_hbox2)
precision_ioc_label.setFixedWidth(common_label_width)
precision_ioc.setFixedWidth(50)
_hbox2_wgt.setFixedWidth(common_hbox_width)
layout.addWidget(_hbox_wgt)
layout.addWidget(_hbox2_wgt)
if 'Timestamp' in self.columns_dict.keys():
#time-stamp
_hbox4_wgt = QWidget()
_hbox4 = QHBoxLayout()
ts_label = QLabel("Timestamp: ")
self.ts_combox_idx_dict = {
'second (s)': self.format_ts_sec,
'decisecond (ds)': self.format_ts_deci,
'millisecond (ms)': self.format_ts_milli,
'microsecond (\u03bcs)': self.format_ts_micro,
'nanosecond (ns)': self.format_ts_nano}
ts_resolution = QComboBox(self)
for key, ts_res in self.ts_combox_idx_dict.items():
ts_resolution.addItem(key)
_current_idx = 0
for i, (key, ts_res) in enumerate(self.ts_combox_idx_dict.items()):
if ts_res == self.format_ts_decimal_part:
_current_idx = i
break
ts_resolution.setCurrentIndex(_current_idx)
ts_resolution.currentIndexChanged.connect(
self.table_ts_resolution_changed)
_hbox4.addWidget(ts_label)
_hbox4.addWidget(ts_resolution)
_hbox4_wgt.setLayout(_hbox4)
ts_label.setFixedWidth(common_label_width)
ts_resolution.setFixedWidth(common_wgt_width)
_hbox4_wgt.setFixedWidth(common_hbox_width)
layout.addWidget(_hbox4_wgt)
#precision refresh rate
_hbox3_wgt = QWidget()
_hbox3 = QHBoxLayout()
refresh_freq_label = QLabel("Refresh rate: ")
#_default_refresh_val = 0 if self.notify_freq_hz <= 0 else \
# self.notify_freq_hz
_default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \
self.notify_freq_hz_default
self.refresh_freq_combox_idx_dict = {0: 0, 1: 10, 2: 5, 3: 2, 4: 1,
5: 0.5, 6: _default_refresh_val}
refresh_freq = QComboBox(self)
refresh_freq.addItem('direct')
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[1]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[2]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[3]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[4]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[5]))
_default_text = 'default (direct)' if _default_refresh_val == 0 else \
'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6])
refresh_freq.addItem(_default_text)
for key, value in self.refresh_freq_combox_idx_dict.items():
if value == self.notify_freq_hz:
refresh_freq.setCurrentIndex(key)
break
refresh_freq.currentIndexChanged.connect(
self.table_refresh_rate_changed)
_hbox3.addWidget(refresh_freq_label)
_hbox3.addWidget(refresh_freq)
_hbox3_wgt.setLayout(_hbox3)
refresh_freq_label.setFixedWidth(common_label_width)
refresh_freq.setFixedWidth(common_wgt_width)
_hbox3_wgt.setFixedWidth(common_hbox_width)
layout.addWidget(_hbox3_wgt)
layout.setAlignment(Qt.AlignLeft)
layout.setContentsMargins(10, 0, 0, 0)
layout.setSpacing(0)
display_wgt.setMinimumWidth(340)
display_wgt.setLayout(layout)
display_wgt.exec()
def mousePressEvent(self, event):
row = self.indexAt(event.pos()).row()
if row > -1:
if row < len(self.pv_list):
self.pv_gateway[row].mousePressEvent(event)
else:
button = event.button()
if button == Qt.RightButton:
self.table_context_menu.exec(QCursor.pos())
self.clearFocus()
#remove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
pass
def leaveEvent(self, event):
self.clearSelection()
self.clearFocus()
del event
class QMessageWidget(QListWidget):
"""Log message window."""
def __init__(self, parent=None):
super(QMessageWidget, self).__init__(parent)
self.myItem = None
self.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.setFocusPolicy(Qt.StrongFocus)
def leaveEvent(self, event):
if self.myItem:
self.clearSelection()
self.clearFocus()
del event
def mousePressEvent(self, event):
item = self.itemAt(event.x(), event.y())
if item:
self.myItem = item
self.setCurrentItem(self.myItem)
def keyPressEvent(self, event):
if event.matches(QKeySequence.Copy):
nitem = event.count()
if nitem:
if self.myItem is not None:
_str = self.myItem.text()
QApplication.clipboard().setText(_str)
class QResultsWidget:
"""Results table"""
def __init__(self, summary_dict=None, table_dict=None):
self.summary_dict = summary_dict
self.table_dict = table_dict
self._group_box = None
def group_box(self, title=""):
self._group_box = QGroupBox(title)
self._group_box.setObjectName("OUTERLEFT")
_vbox = QVBoxLayout()
_qspace = QFrame()
_qspace.setFixedHeight(10)
_vbox.addWidget(_qspace)
_font = QFont("Sans Serif", 10)
longest_str_item1 = ""
longest_str_item2 = ""
for i, (label, text) in enumerate(self.summary_dict.items()):
if len(str(label)) > len(longest_str_item1):
longest_str_item1 = str(label)
if len(str(text)) > len(longest_str_item2):
longest_str_item2 = str(text)
fm = QFontMetricsF(_font)
_factor = 1.15
if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"):
_factor = 1.18
qrect1 = fm.boundingRect(longest_str_item1)
qrect2 = fm.boundingRect(longest_str_item2)
_width_scaling_factor = 1.5
_width_scaling_factor_le = 1.15
_widget_height = 25
for i, (label, text) in enumerate(self.summary_dict.items()):
#print(label, text)
qlabel = QLabel(label)
qle = QLineEdit(text)
qlabel.setFont(_font)
qlabel.setStyleSheet(("QLabel{color:black;" +
"margin:0px; padding:2px;}"))
qlabel.setFixedWidth(int(qrect1.width() * _width_scaling_factor))
qlabel.setFixedHeight(_widget_height)
qle.setFocusPolicy(Qt.NoFocus)
qle.setFont(_font)
qle.setStyleSheet(("QLineEdit{color:blue;" +
"background-color: lightgray;" +
"qproperty-readOnly: true;" +
"margin:0px; padding:2px;}"))
qle.setFixedWidth(int(qrect2.width() * _width_scaling_factor_le))
qle.setFixedHeight(_widget_height)
qle.setAlignment(Qt.AlignRight)
_hbox_widget = QWidget()
_hbox = QHBoxLayout()
_hbox.addWidget(qlabel)
_hbox.addWidget(qle)
_hbox_widget.setLayout(_hbox)
_hbox.setAlignment(Qt.AlignCenter)
_hbox.setContentsMargins(0, 2, 0, 0)
_vbox.addWidget(_hbox_widget)
_vbox.setContentsMargins(0, 0, 0, 0)
_vbox.setAlignment(Qt.AlignCenter|Qt.AlignTop)
_vbox2_widget = QWidget()
_vbox2 = QVBoxLayout()
_vbox2.setContentsMargins(0, 20, 0, 40)
table = QTableWidget(len(self.table_dict)-1, 2)
table.verticalHeader().setVisible(False)
table.setFocusPolicy(Qt.NoFocus)
#table.setFont(_font)
longest_str_item1 = ""
longest_str_item2 = ""
for i, (label, text) in enumerate(self.table_dict.items()):
item1 = QTableWidgetItem(str(label))
item2 = QTableWidgetItem(str(text))
item1.setTextAlignment(Qt.AlignCenter)
item2.setTextAlignment(Qt.AlignCenter)
item1.setForeground(QColor("black"))
item2.setForeground(QColor("black"))
if i%2 == 0:
item1.setBackground(QColor("lightgray"))
item2.setBackground(QColor("lightgray"))
if len(str(label)) > len(longest_str_item1):
longest_str_item1 = str(label)
if len(str(text)) > len(longest_str_item2):
longest_str_item2 = str(text)
if i == 0:
#item1.setFont(_font)
#item2.setFont(_font)
table.setHorizontalHeaderItem(0, item1)
table.setHorizontalHeaderItem(1, item2)
else:
table.setItem(i-1, 0, item1)
table.setItem(i-1, 1, item2)
fm = QFontMetricsF(_font)
_factor = 1.2
if LooseVersion(QT_VERSION_STR) < LooseVersion("5.0"):
_factor = 1.18
qrect = fm.boundingRect(longest_str_item1 + longest_str_item2)
_width_scaling_factor = 1.04
table.resizeColumnsToContents()
table.resizeRowsToContents()
table.setFixedHeight(int(fm.lineSpacing() * _factor * len(
self.table_dict)) + fm.lineSpacing()*2)
table.setFixedWidth(int(qrect.width() * _width_scaling_factor))
_vbox2.addWidget(table)
_vbox2.setAlignment(Qt.AlignmentFlag(Qt.AlignCenter|Qt.AlignTop))
_vbox2_widget.setLayout(_vbox2)
_vbox.addWidget(_vbox2_widget)
self._group_box.setLayout(_vbox)
self._group_box.setContentsMargins(20, 20, 20, 20)
self._group_box.setAlignment(Qt.AlignTop)
self._group_box.setFixedHeight(int(
table.height() + (_widget_height*len(self.summary_dict))))
self._group_box.setFixedWidth(int(table.width() + 20))
return self._group_box
class QResultsTableWidget():
"""Results table"""
def __init__(self, column_headings=None):
self.column_headings = column_headings
self._group_box = None
def group_box(self, title="Table of Results"):
self._group_box = QGroupBox(title)
self._group_box.setObjectName("OUTER")
_font = QFont("Sans Serif", 10)
_vbox2_widget = QWidget()
_vbox2 = QVBoxLayout()
_vbox2.setContentsMargins(0, 20, 0, 40)
table = QTableWidget(1, len(self.column_headings))
table.verticalHeader().setVisible(True)
table.setFocusPolicy(Qt.NoFocus)
table.setFont(_font)
for i, heading in enumerate(self.column_headings):
_item = QTableWidgetItem(str(heading))
table.setHorizontalHeaderItem(i, _item)
table.resizeColumnsToContents()
table.resizeRowsToContents()
table.setFixedHeight(400)
_vbox2.addWidget(table)
_vbox2.setAlignment(Qt.AlignmentFlag(Qt.AlignCenter|Qt.AlignTop))
_vbox2_widget.setLayout(_vbox2)
self._group_box.setLayout(_vbox2)
self._group_box.setContentsMargins(20, 20, 20, 20)
self._group_box.setAlignment(Qt.AlignTop)
self._group_box.setFixedWidth(int(table.width() + 20))
return self._group_box
class QHDFDockWidget(QDockWidget):
def __init__(self, title=None, parent=None):
super().__init__(title, parent)
self.parent = parent
self.is_docked = True
self.geometry_from_qsettings = self.parent.application_geometry
self.topLevelChanged.connect(self._top_level_changed)
self.setVisible(False)
self.setFloating(False)
self.geometry_from_qsettings = self.parent.geometry()
def closeEvent(self, event: QCloseEvent):
super().closeEvent(event)
self.parent.setGeometry(self.geometry_from_qsettings)
self.setGeometry(self.geometry_from_qsettings)
QApplication.processEvents()
self.parent.setGeometry(self.geometry_from_qsettings)
def changeEvent(self, event):
pass
def _top_level_changed(self, is_floating):
pass
class QNoDockWidget(QDockWidget):
def __init__(self, title=None, parent=None):
super().__init__(title, parent)
self.parent = parent
self.is_docked = True
self.geometry_from_qsettings = self.parent.application_geometry
self.topLevelChanged.connect(self._top_level_changed)
self.setVisible(False)
self.setFloating(True)
x = self.geometry_from_qsettings.x() + 480 # 3500 #screen.width() - widget.width()
y = self.geometry_from_qsettings.y() + 350 #100 #screen.height() - widget.height()
self.move(x, y)
def changeEvent(self, event):
if "QAbstractButton" in str(self.sender()):
self.geometry_from_qsettings = self.parent.geometry()
def _top_level_changed(self): #, is_floating):
self.setVisible(False)
self.setFloating(True)
#ResetGeometry
self.parent.setGeometry(self.geometry_from_qsettings)
QApplication.processEvents()
class CAQStripChart(PlotWidget):
'''Channel access enabled pyqtgraph.PlotWidget'''
def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'],
monitor_callback=None, pv_within_daq_group: bool = False,
color_mode=None, show_units: bool = False, prefix: str = "",
suffix: str = "", notify_freq_hz: int = 0, title: str = "",
ylabel: str = "", force_ts_align = True, text_label = [],
pen_color_idx = 0):
super().__init__()
self.no_channels = len(pv_list)
self.pen_color_idx = pen_color_idx
self.text_label = text_label
self.found = False
self.time_zero = [0] * self.no_channels
self.time_delta = [0] * self.no_channels
self.pv_list = pv_list
self.pv2item_dict = {}
self.pv_gateway = [None] * self.no_channels
self.pvd_previous_list = [None] * self.no_channels
self.val_previous = [None] * self.no_channels
self.curve = [None] * self.no_channels
for i in range (0, len(self.pv_list)):
self.pv_gateway[i] = PVGateway(
parent, self.pv_list[i], monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
#connect_callback=self.py_connect_callback,
connect_triggers=False, notify_freq_hz=notify_freq_hz,
monitor_dbr_time = True)
self.pv_gateway[i].is_initialize_complete()
self.pvd_previous_list[i] = self.pv_gateway[i].pvd
self.pv_gateway[i].trigger_connect.connect(
self.receive_connect_update)
self.pv_gateway[i].trigger_monitor_str.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_int.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_float.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor.connect(
self.receive_monitor_dbr_time)
self.pv_gateway[i].widget_class = "PlotWidget"
self.pv2item_dict[self.pv_gateway[i]] = i
self.cafe = self.pv_gateway[0].cafe
self.cyca = self.pv_gateway[0].cyca
for i in range(0, len(self.pv_gateway)):
if self.cafe.isConnected(self.pv_gateway[i].pv_name):
self.pv_gateway[i].trigger_connect.emit(
self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name),
self.pv_gateway[i].cyca.ICAFE_CS_CONN)
for i in range(0, len(self.pv_gateway)):
if not self.pv_gateway[i].pv_within_daq_group:
self.pv_gateway[i].monitor_start()
sampleinterval = 0.2
timewindow = 1800.0
self.ts_delta_max = 0.6
# Data stuff
self._interval = int(sampleinterval*1000)
self._bufsize = 10000 #int(timewindow/0.33)
self._bufsize2 = 10000 # int(timewindow/1.33)
self.databuffer = [None] * self.no_channels
self.timebuffer = [None] * self.no_channels
self.x = [None] * self.no_channels
self.y = [None] * self.no_channels
self.x_shifted = [None] * self.no_channels
self.idx = [0] * self.no_channels
for i in range(0, self.no_channels):
bsize = self._bufsize if i == 0 else self._bufsize2
self.databuffer[i] = collections.deque([None]*bsize, bsize)
self.timebuffer[i] = collections.deque([0]*bsize, bsize)
self.x[i] = np.zeros(bsize, dtype=np.float)
self.y[i] = np.zeros(bsize, dtype=np.float)
_long_size=20
#self.data_series_buffer = collections.deque([0]*_long_size, _long_size)
#self.time_series_buffer = collections.deque([0]*_long_size, _long_size)
#self.data_series = [] * self.no_channels
#self.time_series = [] * self.no_channels
self.iflag_series = 0
#self.x = np.linspace(-timewindow, 0.0, self._bufsize)
#self.x_series = np.zeros(_long_size, dtype=np.float)
#self.y_series = np.zeros(_long_size, dtype=np.float)
if title is not None:
self.setTitle(str(title)) #self.pv_gateway[0].pv_name)
self.showGrid(x=True, y=True)
#self.setLabel('left', ylabel, self.pv_gateway[0].units)
self.setLabel('bottom', 'time', 's')
self.setBackground((60, 60, 60)) #247, 236, 249))
#self.setLimits(yMin=-0.11)
#self.setLimits(yMin=-1, yMax=1)
ax = pg.AxisItem('left')
ax.enableAutoSIPrefix(enable=False)
ax.setLabel(ylabel, self.pv_gateway[0].units)
ax.setGrid(155)
ay = pg.AxisItem('bottom')
ay.enableAutoSIPrefix(enable=False)
ay.setLabel('time', 'min')
ay.setGrid(175)
if 'BPM' in text_label:
ax.setTickSpacing(0.2, 0.1)
#ax.setRange(-1, 1)
#ax.setParentItem(self.graphicsItem())
axitems = {}
axitems['left'] = ax
axitems['bottom'] = ay
self.setAxisItems(axitems)
pg.setConfigOption('leftButtonPan', False)
self.plotItem.setMouseEnabled(y=True) # Only allow zoom in X-axis
self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis
#(125, 249, 255)
if self.pen_color_idx == 0:
pen_list = [ (255, 155, 0), (255,255,0), (0, 180, 255) ]
elif self.pen_color_idx == 1:
pen_list = [ (125, 249, 255), (255,255,0), (0, 180, 255) ]
else:
pen_list = [ (0, 180, 255), (125, 249, 255), (255,255,0) ]
for i in range(0, len(self.pv_gateway)):
self.curve[i] = self.plot(self.x[0], self.y[0], pen=pen_list[i]) # (0, 253, 235))
#self.curve[1] = self.plot(self.x[1], self.y[1], pen=(255,255,0))
#offset=(1.0, 1.0),
l=pg.LegendItem() #horSpacing=20, verSpacing=0, labelTextColor=(205, 205, 205),
#labelTextSize='6px', colCount=1)
l.setParentItem(self.graphicsItem())
l.anchor((0,0), (0.08, 0.0))
#l.setLabelTextColor((205, 205, 205))
#l.setLabelTextSize(9) does not exists(!)
#l.setOffset(-60)
for curv, label in zip(self.curve, self.text_label):
l.addItem(curv, label)
#for curv, pv in zip(self.curve, self.pv_gateway):
# l.addItem(curv, self.textpv.pv_name)
#self.daq_stop()
#print(self._bufsize)
#print(len(self.x), len(self.y))
QApplication.processEvents()
@Slot(object, int)
def receive_monitor_dbr_time(self, pvdata, alarm_severity):
#if not self.mutex.tryLock(): #locked():
# print("Event locked", pvdata.ts[0], pvdata.ts[1])
# return
#self.mutex.lock() #acquire()
_row = self.pv2item_dict[self.sender()]
#print("row, value from pvdata==>", _row, pvdata.value[0], self.pv_gateway[_row].pv_name)
ts_now = pvdata.ts[0] + pvdata.ts[1] * 10**(-9)
ts_previous = (self.pvd_previous_list[_row].ts[0] +
self.pvd_previous_list[_row].ts[1] * 10**(-9))
ts_delta = ts_now - ts_previous
if (pvdata.ts[0] == self.pvd_previous_list[_row].ts[0]) and (
pvdata.ts[1] == self.pvd_previous_list[_row].ts[1]):
#print("SAME TIMESTAMP")
#pvdata.show()
#self.pvd_previous_list[_row].show()
#print("================")
#self.mutex.unlock() #release()
return
if pvdata.ts[0] < self.pvd_previous_list[_row].ts[0]:
print("Funny ts value", self.pv_gateway[_row].pv_name)
pvdata.show()
#self.mutex.unlock() #release()
return
value = pvdata.value[0]
#discard first callbacks
#if ts_delta > 2.0:
# self.pvd_previous_list[_row] = _pvd
# return;
self.pvd_previous_list[_row] = pvdata
self.val_previous[_row] = value
#self.pvd_previous_list[_row].ts[0] = _pvd.ts[0]
#self.pvd_previous_list[_row].ts[1] = _pvd.ts[1]
self.databuffer[_row].append(value)
self.timebuffer[_row].append(self.time_delta[_row])
highest_ts = self.timebuffer[0][0] \
if self.timebuffer[0][0] is not None else 0
for i in range(1, len(self.timebuffer)):
if self.timebuffer[i][0] is None:
continue
elif self.timebuffer[i][0] > highest_ts:
highest_ts = self.timebuffer[i][0]
if self.timebuffer[_row][0] is not None:
for i, val in enumerate(self.timebuffer[_row]):
if val > highest_ts:
self.idx[_row] = i - 1
break
self.y[_row][:] = self.databuffer[_row]
self.x[_row][:] = self.timebuffer[_row]
idx = self.idx[_row]
self.x_shifted[_row] = list(map(lambda m : (m - self.time_delta[_row]), self.x[_row][idx:]))
#print("idx", self.idx)
#if 'AVG' in self.pv_gateway[_row].pv_name:
# for row in range(0, len(self.curve)):
# self.curve[row].setData(self.x_shifted[row], self.y[row][idx:])
self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:])
self.time_delta[_row] = ((
pvdata.ts[0] + pvdata.ts[1]*10**(-9)) - self.time_zero[0])/60
#self.mutex.unlock() #release()
#QApplication.processEvents()
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
#self.pv_gateway.receive_monitor_update(value, status, alarm_severity)
_row = self.pv2item_dict[self.sender()]
#if _row == 1:
# return
print("row, value===>", _row, value, self.pv_gateway[_row].pv_name)
_pvd = self.pv_gateway[_row].cafe.getPVCache(
self.pv_gateway[_row].handle)
#print("value", _pvd.value[0], self.pvd_previous_list[_row].value[0])
_pvd2 = self.pv_gateway[_row].pvd
print ("val", value, _pvd2.value[0], _pvd.value[0], self.pvd_previous_list[_row].value[0])
ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9)
ts_previous = (self.pvd_previous_list[_row].ts[0] +
self.pvd_previous_list[_row].ts[1] * 10**(-9))
ts_delta = ts_now - ts_previous
if value == self.val_previous[_row]:
#if (_pvd.ts[0] == self.pvd_previous_list[_row].ts[0]) and (
# _pvd.ts[1] == self.pvd_previous_list[_row].ts[1]):
_pvd.show()
#self.pvd_previous_list[_row].show()
return
#discard first callbacks
#if ts_delta > 2.0:
# self.pvd_previous_list[_row] = _pvd
# return;
self.pvd_previous_list[_row] = _pvd2
self.val_previous[_row] = value
#self.pvd_previous_list[_row].ts[0] = _pvd.ts[0]
#self.pvd_previous_list[_row].ts[1] = _pvd.ts[1]
self.databuffer[_row].append(value)
self.timebuffer[_row].append(self.time_delta[_row])
highest_ts = self.timebuffer[0][0] \
if self.timebuffer[0][0] is not None else 0
for i in range(1, len(self.timebuffer)):
if self.timebuffer[i][0] is None:
continue
elif self.timebuffer[i][0] > highest_ts:
highest_ts = self.timebuffer[i][0]
if self.timebuffer[_row][0] is not None:
for i, val in enumerate(self.timebuffer[_row]):
if val > highest_ts:
self.idx[_row] = i - 1
break
'''
for i in range(1, self.timebuffer):
if self.timebuffer[i][0] is not None:
a = self.timebuffer[0][0]
for i, val in enumerate(self.timebuffer[_row]):
if val > a:
idx = i - 1
break
'''
self.y[_row][:] = self.databuffer[_row]
self.x[_row][:] = self.timebuffer[_row]
#self.y[_row][:] = self.databuffer[_row]
#self.x[_row][:] = self.timebuffer[_row]
'''
#print(ts_delta, value, self.pvd_previous.value[0])
#if (ts_delta < self.ts_delta_max) and (value < self.pvd_previous.value[0]) :
if (value < self.pvd_previous.value[0]) :
self.data_series_buffer.append(value)
self.time_series_buffer.append(ts_now - self.time_zero ) #self.time_delta)
self.y_series[:] = self.data_series_buffer
self.x_series[:] = self.time_series_buffer
#print(self.x_series, self.y_series)
#elif ts_delta < 1.0:
if len(self.data_series_buffer) > 15:
#x_series = np.array(self.time_series, dtype=np.float)
#y_series = np.array(self.data_series, dtype=np.float)
_x=self.x_series.reshape((-1, 1))
model = LinearRegression()
model.fit(_x, self.y_series)
r_sq = model.score(_x, self.y_series)
###JCprint('coefficient of determination:', r_sq, "slope", model.coef_ , "lifetime:", self.y_series[0]/model.coef_ / 3600)
#print('intercept:', model.intercept_)
#print('slope:', model.coef_)
#print('max value', y_series[0], y_series[1])
if r_sq > 0.995:
_I = self.y_series[0]
###JCprint("lifetime:", _I/model.coef_ / 3600)
y_pred = model.predict(_x)
#print("len, y_pred, _x", len(y_pred), len(self.y_series), len(_x))
#print('predicted response:', y_pred, sep='\n')
m_sq_error = mean_squared_error(self.y_series, y_pred)
#print('Mean squared error: {0:.9f}'.format(
# mean_squared_error(y_series, y_pred)))
#print('Coefficient of determination: {0:.9f}'.format(
# r2_score(y_series, y_pred)))
self.trigger_series_sequence.emit(self.x_series, self.y_series)
#print("emit")
self.data_series = []
self.time_series = []
#print(len(self.x_series), len(self.y_series))
else:
self.data_series = []
self.time_series = []
'''
#dt = (self.x[-1] - self.x[-2])
#print("dt", dt)
#Lowet IPCT before trigger is set to t=0
idx = self.idx[_row]
self.x_shifted[_row] = list(map(lambda m : (m - self.time_delta[_row]), self.x[_row][idx:]))
##self.y = np.where(self.y != self.y, 0, self.y) #test for nan
#print("row len len ", _row, self.time_delta[0], self.time_delta[1])
self.curve[_row].setData(self.x_shifted[_row], self.y[_row][idx:])
self.time_delta[_row] = (
_pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero[0]
'''
LOOK_BACK = -800
if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name:
LOOK_BACK = -250
if value > self.y[-2]:
if not self.found:
#print(x_shifted[-240:], self.y[-240:])
#self.y = np.where(self.y != self.y, 0, self.y) #test for nan
max_index = self.y[LOOK_BACK:].argmax()
if max_index == 0:
return
print("max index=", max_index)
#print(x_shifted[-600+max_index:], self.x[-600+max_index:])
#print(self.y[-600+max_index:-2])
self.found = True
#print("Are Signals blocked??", self.signalsBlocked())
self.trigger_decay_sequence.emit(np.array(
x_shifted[LOOK_BACK+max_index+9:-2]), self.y[LOOK_BACK+max_index+9:-2])
else:
self.found = False
'''
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
print("pv_name==>", pv_name)
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].receive_connect_update(handle, pv_name, status,
post_display=False)
#self.pv_gateway.receive_connect_update(handle, pv_name, status)
_pvd = self.pv_gateway[_row].cafe.getPVCache(self.pv_gateway[_row].handle)
if self.time_zero[_row] == 0:
self.time_zero[_row] = _pvd.ts[0] + _pvd.ts[1]*10**(-9)
self.pvd_previous = _pvd
#renove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
#event.ignore()
pass
def leaveEvent(self, event):
self.clearFocus()
del event
class CAQPCTChart(PlotWidget):
'''Channel access enabled pyqtgraph.PlotWidget'''
#trigger_monitor_float = Signal(float, int, int)
#trigger_monitor_int = Signal(int, int, int)
#trigger_monitor_str = Signal(str, int, int)
#trigger_connect = Signal(int, str, int)
trigger_decay_sequence = Signal(np.ndarray, np.ndarray)
trigger_series_sequence = Signal(np.ndarray, np.ndarray)
#def py_connect_callback(self, handle, pvname, status):
# self.trigger_connect.emit(int(handle), str(pvname), int(status))
# print("py connect callback", handle, pvname, status)
def daq_start(self):
self.blockSignals(False)
def daq_pause(self):
self.blockSignals(True)
def daq_stop(self):
self.blockSignals(True)
def __init__(self, parent=None, pv_list: list = ['PV_NAME_NOT_GIVEN'],
monitor_callback=None, pv_within_daq_group: bool = False,
color_mode=None, show_units: bool = False, prefix: str = "",
suffix: str = "", notify_freq_hz: int = 0):
super().__init__()
self.found = False
self.time_zero = 0
self.time_delta = 0
self.pv_list = pv_list
self.pv2item_dict = {}
self.pv_gateway = [None] * len(self.pv_list)
self.pvd_previous = None
for i in range(0, len(self.pv_list)):
self.pv_gateway[i] = PVGateway(
parent, pv_list[i], monitor_callback, pv_within_daq_group,
color_mode, show_units, prefix, suffix,
#connect_callback=self.py_connect_callback,
connect_triggers=False, notify_freq_hz=notify_freq_hz)
self.pv_gateway[i].is_initialize_complete()
self.pv_gateway[i].trigger_connect.connect(
self.receive_connect_update)
self.pv_gateway[i].trigger_monitor_str.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_int.connect(
self.receive_monitor_update)
self.pv_gateway[i].trigger_monitor_float.connect(
self.receive_monitor_update)
self.pv_gateway[i].widget_class = "PlotWidget"
self.pv2item_dict[self.pv_gateway[i]] = i
self.cafe = self.pv_gateway[0].cafe
self.cyca = self.pv_gateway[0].cyca
for i in range(0, len(self.pv_gateway)):
if self.cafe.isConnected(self.pv_gateway[i].pv_name):
self.pv_gateway[i].trigger_connect.emit(
self.pv_gateway[i].handle, str(self.pv_gateway[i].pv_name),
self.pv_gateway[i].cyca.ICAFE_CS_CONN)
for i in range(0, len(self.pv_gateway)):
if not self.pv_gateway[i].pv_within_daq_group:
self.pv_gateway[i].monitor_start()
sampleinterval = 0.333
timewindow = 1800.0
self.ts_delta_max = 0.6
# Data stuff
self._interval = int(sampleinterval*1000)
self._bufsize = int(timewindow/sampleinterval)
self.databuffer = collections.deque([None]*self._bufsize, self._bufsize)
self.timebuffer = collections.deque([0]*self._bufsize, self._bufsize)
_long_size = 20
self.data_series_buffer = collections.deque([0]*_long_size, _long_size)
self.time_series_buffer = collections.deque([0]*_long_size, _long_size)
self.data_series = []
self.time_series = []
self.iflag_series = 0
#self.x = np.linspace(-timewindow, 0.0, self._bufsize)
self.x = np.zeros(self._bufsize, dtype=np.float)
self.y = np.zeros(self._bufsize, dtype=np.float)
self.x_series = np.zeros(_long_size, dtype=np.float)
self.y_series = np.zeros(_long_size, dtype=np.float)
self.setTitle("PCT(t)") #self.pv_gateway[0].pv_name)
self.showGrid(x=True, y=True)
self.setLabel('left', 'I', 'mA')
self.setLabel('bottom', 'time', 's')
self.setBackground((60, 60, 60)) #247, 236, 249))
self.setLimits(yMin=-0.11)
self.plotItem.setMouseEnabled(y=False) # Only allow zoom in X-axis
self.plotItem.setMouseEnabled(x=True) # Only allow zoom in Y-axis
self.curve = self.plot(self.x, self.y, pen=(125, 249, 255))
#self.curve2 = self.plot(self.x, self.y, pen=(255,255,0))
l = pg.LegendItem(offset=(0., 0.5))
l.setParentItem(self.graphicsItem())
l.setLabelTextColor((125, 249, 255))
l.addItem(self.curve, str(self.pv_gateway[0].pv_name))
'''
l2=self.addLegend()
l2.setLabelTextColor('g')
l2.setOffset(10)
l2.addItem(self.curve2, str(self.pv_gateway[0].pv_name))
'''
self.daq_stop()
print(self._bufsize)
print(len(self.x), len(self.y))
@Slot(str, int, int)
@Slot(int, int, int)
@Slot(float, int, int)
def receive_monitor_update(self, value, status, alarm_severity):
#self.pv_gateway.receive_monitor_update(value, status, alarm_severity)
_row = self.pv2item_dict[self.sender()]
#print("value===>", value, self.pv_gateway[_row].pv_name)
_pvd = self.pv_gateway[_row].cafe.getPVCache(
self.pv_gateway[_row].handle)
ts_now = _pvd.ts[0] + _pvd.ts[1] * 10**(-9)
ts_previous = self.pvd_previous.ts[0] + self.pvd_previous.ts[1]*10**(-9)
ts_delta = ts_now - ts_previous
if (_pvd.ts[0] == self.pvd_previous.ts[0]) and (
_pvd.ts[1] == self.pvd_previous.ts[1]):
#_pvd.show()
return
#discard first callbacks
if ts_delta > 2.0:
self.pvd_previous = _pvd
return
self.databuffer.append(value)
self.y[:] = self.databuffer
self.timebuffer.append(self.time_delta)
self.x[:] = self.timebuffer
#print(ts_delta, value, self.pvd_previous.value[0])
#if (ts_delta < self.ts_delta_max) and (value <
# self.pvd_previous.value[0]):
if value < self.pvd_previous.value[0]:
self.data_series_buffer.append(value)
self.time_series_buffer.append(ts_now - self.time_zero)
self.y_series[:] = self.data_series_buffer
self.x_series[:] = self.time_series_buffer
#print(self.x_series, self.y_series)
#elif ts_delta < 1.0:
if len(self.data_series_buffer) > 15:
#x_series = np.array(self.time_series, dtype=np.float)
#y_series = np.array(self.data_series, dtype=np.float)
_x = self.x_series.reshape((-1, 1))
model = LinearRegression()
model.fit(_x, self.y_series)
r_sq = model.score(_x, self.y_series)
###JCprint('coefficient of determination:',
###r_sq, "slope", model.coef_ , "lifetime:",
###self.y_series[0]/model.coef_ / 3600)
#print('intercept:', model.intercept_)
#print('slope:', model.coef_)
#print('max value', y_series[0], y_series[1])
if r_sq > 0.995:
#_I = self.y_series[0]
###JCprint("lifetime:", _I/model.coef_ / 3600)
####y_pred = model.predict(_x)
#print("len, y_pred, _x", len(y_pred), len(self.y_series),
# len(_x))
#print('predicted response:', y_pred, sep='\n')
##m_sq_error = mean_squared_error(self.y_series, y_pred)
#print('Mean squared error: {0:.9f}'.format(
# mean_squared_error(y_series, y_pred)))
#print('Coefficient of determination: {0:.9f}'.format(
# r2_score(y_series, y_pred)))
self.trigger_series_sequence.emit(self.x_series,
self.y_series)
#print("emit")
self.data_series = []
self.time_series = []
#print(len(self.x_series), len(self.y_series))
else:
self.data_series = []
self.time_series = []
self.pvd_previous = _pvd
#dt = (self.x[-1] - self.x[-2])
#print("dt", dt)
#Lowet IPCT before trigger is set to t=0
x_shifted = list(map(lambda m: (m - self.time_delta), self.x))
##self.y = np.where(self.y != self.y, 0, self.y) #test for nan
self.curve.setData(x_shifted, self.y)
self.time_delta = (
_pvd.ts[0] + _pvd.ts[1]*10**(-9)) - self.time_zero
#x_shifted2= list(map(lambda m : m -self.time_delta-1 , self.x))
#self.curve2.setData(x_shifted2, self.y)
#QApplication.processEvents()
#print(type(x_shifted), type(self.y), type([1.1]), type(1.1))
LOOK_BACK = -800
if 'ARIDI-PCT2:CURRENT' in self.pv_gateway[_row].pv_name:
LOOK_BACK = -250
if value > self.y[-2]:
if not self.found:
#print(x_shifted[-240:], self.y[-240:])
#self.y = np.where(self.y != self.y, 0, self.y) #test for nan
max_index = self.y[LOOK_BACK:].argmax()
if max_index == 0:
return
print("max index=", max_index)
#print(x_shifted[-600+max_index:], self.x[-600+max_index:])
#print(self.y[-600+max_index:-2])
self.found = True
#print("Are Signals blocked??", self.signalsBlocked())
self.trigger_decay_sequence.emit(
np.array(x_shifted[LOOK_BACK+max_index+9:-2]),
self.y[LOOK_BACK+max_index+9:-2])
else:
self.found = False
@Slot(int, str, int)
def receive_connect_update(self, handle: int, pv_name: str, status: int):
'''Triggered by connect signal'''
print("pv_name==>", pv_name)
_row = self.pv2item_dict[self.sender()]
self.pv_gateway[_row].receive_connect_update(handle, pv_name, status,
post_display=False)
#self.pv_gateway.receive_connect_update(handle, pv_name, status)
_pvd = self.pv_gateway[_row].cafe.getPVCache(
self.pv_gateway[_row].handle)
if self.time_zero == 0:
self.time_zero = _pvd.ts[0] + _pvd.ts[1]*10**(-9)
#print(self.time_zero)
self.pvd_previous = _pvd
#renove highlighting which persists after mouse leaves
def mouseMoveEvent(self, event):
pass
def leaveEvent(self, event):
self.clearFocus()
del event