Files
caqtwidgets/pvgateway.py
2021-11-02 14:32:16 +01:00

1909 lines
75 KiB
Python

"""The module provides data and metadata of a process variable through PyCafe."""
__author__ = 'Jan T. M. Chrin'
import copy
import inspect
import sys
import time
from datetime import datetime
from distutils.version import LooseVersion
from qtpy.QtCore import (QEvent, QObject, QMutex, QPoint, QProcess, QRect,
QSettings, Qt, QUrl, Signal, Slot)
from qtpy.QtCore import __version__ as QT_VERSION_STR
from qtpy.QtGui import (QColor, QCursor, QDesktopServices, QFont, QPainter,
QPalette)
from qtpy.QtWidgets import (QAction, QApplication, QComboBox, QDialog,
QHBoxLayout, QLabel, QLineEdit, QMenu, QMessageBox,
QPushButton, QSpinBox, QVBoxLayout, QWidget)
from bdbase.readjson import Rjson
from bdbase.enumkind import DAQState
def __LINE__():
return inspect.currentframe().f_back_f_lineno
class PVGateway(QWidget):
"""Retrieves pv metadata through PyCafe.
The PVGateway class when subclassed by Qt widgets enables their connectivity
to channel access.
Attributes:
monid: (int) Monitor id
units : (str) Units associated with the pv
trigger_monitor_<datatype> is the signal triggered by updates arising from
monitored pvs.
trigger_connect is the signal triggered from changes in pv connection status.
widget_handle_dict is a dictionary mapping widgets to their pv handle.
A pv handle may be associated to more than one widget.
"""
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int) #pvdata, status
trigger_connect = Signal(int, str, int)
trigger_daq = Signal(object, str, int)
trigger_daq_int = Signal(object, str, int)
trigger_daq_str = Signal(object, str, int)
#Properties, user supplied
ACT_ON_BEAM = 'actOnBeam'
NOT_ACT_ON_BEAM = 'notActOnBeam'
READBACK_ALARM = 'alarm'
READBACK_STATIC = 'static'
#Properties, dynamic
DISCONNECTED = 'disconnected'
ALARM_SEV_MINOR = 'alarmSevMinor'
ALARM_SEV_MAJOR = 'alarmSevMajor'
ALARM_SEV_INVALID = 'alarmSevInvalid'
ALARM_SEV_NO_ALARM = READBACK_ALARM
DAQ_STOPPED = 'stopped'
DAQ_PAUSED = 'paused'
#ObjectName, defined by CAQ<Widget>
PV_CONTROLLER = "Controller"
PV_READBACK = "Readback"
PV_DAQ_BS = "BSRead"
PV_DAQ_CA = "CARead"
_DAQ_CAFE_SG_NAME = "gBS2CA"
_alarm_severity_record_types = ["ai", "ao", "calc", "calcout", "dfanout",
"longin", "longout", "pid", "sel",
"steppermotor", "sub"]
#parent is Gui
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode = None,
show_units: bool = False, prefix: str = "", suffix: str = "",
connect_callback=None, msg_label: str = "",
connect_triggers: bool = True, notify_freq_hz: int = 0,
notify_unison: bool = False, precision: int = 0,
monitor_dbr_time: bool = False):
#super(PVGateway, self).__init__() # do NOT use parent
#It turned out a widget was created with the main window as a parent, but incorrectly placed.
#Parent must not be QMainWindow. This interferes with the toolbar!! 16 Aug. 2020
super().__init__()
if parent is None:
return
if pv_name is "":
return
self.connect_callback = connect_callback
self.notify_freq_hz = abs(notify_freq_hz)
self.notify_freq_hz_default = self.notify_freq_hz
self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \
1000 / self.notify_freq_hz
self.notify_unison = True if notify_unison and \
self.notify_freq_hz > 0 else False
self.parent = parent
self.pv_name = pv_name
self.color_mode = None
if color_mode is not None:
if color_mode in (self.ACT_ON_BEAM,
self.NOT_ACT_ON_BEAM,
self.READBACK_ALARM,
self.READBACK_STATIC):
self.color_mode = color_mode
self.color_mode_requested = self.color_mode
if monitor_callback is not None:
self.monitor_callback = monitor_callback
else:
self.monitor_callback = None
self.pv_within_daq_group = pv_within_daq_group
self.show_units = show_units
self.prefix = prefix
self.suffix = suffix
self.cafe = self.parent.cafe
self.cyca = self.parent.cyca
if self.parent.settings is not None:
self.settings = self.parent.settings
else:
self.settings = Rjson(self.parent.appname)
self.daq_group_name = self._DAQ_CAFE_SG_NAME
self.desc = None
self.handle = None
self.initialize_complete = False
self.initialize_again = False
self.msg_label = msg_label
self.msg_press_value = None
self.msg_release_value = None
self.monitor_id = None
self.monitor_dbr_time = monitor_dbr_time
self.mutex_post_display = QMutex()
self.precision_user = precision
self.has_precision_user = True if precision > 0 else False
self.precision_pv = 3
self.precision = (self.precision_user if self.has_precision_user else
self.precision_pv)
self.pvd = None
self.pv_ctrl = None
self.pv_info = None
self.record_type = None
if self.parent.showMessage is not None:
self.showMessage = self.parent.showMessage
else:
self.showMessage = None
self.qt_object_name = None
self.qt_property_controller = {
self.DISCONNECTED : False,
self.ACT_ON_BEAM : False, self.NOT_ACT_ON_BEAM : False
}
self.qt_property_readback = {
self.DISCONNECTED : False,
self.READBACK_ALARM : False, self.READBACK_STATIC : False,
self.ALARM_SEV_MINOR : False, self.ALARM_SEV_MAJOR : False,
self.ALARM_SEV_INVALID : False
}
self.qt_property_daq_bs = {
self.DISCONNECTED : False,
self.READBACK_ALARM : False, self.READBACK_STATIC : False,
self.ALARM_SEV_MINOR : False, self.ALARM_SEV_MAJOR : False,
self.ALARM_SEV_INVALID : False,
self.DAQ_STOPPED : False,
self.DAQ_PAUSED : False
}
self.qt_property_daq_ca = {
self.DISCONNECTED : False,
self.READBACK_ALARM : False, self.READBACK_STATIC : False,
self.ALARM_SEV_MINOR : False, self.ALARM_SEV_MAJOR : False,
self.ALARM_SEV_INVALID : False,
self.DAQ_STOPPED : False,
self.DAQ_PAUSED : False
}
self.qt_object_to_property = {
self.PV_CONTROLLER : self.qt_property_controller,
self.PV_READBACK : self.qt_property_readback,
self.PV_DAQ_BS : self.qt_property_daq_bs,
self.PV_DAQ_CA : self.qt_property_daq_ca
}
self._qt_property_selected = {}
self.status_tip = None
self.suggested_text = ""
self.time_monotonic = time.monotonic()
self.pvd_previous = None
self.timeout = 0.2
self.units = ""
self.widget = self
_widget_name_part = str(self.widget.__class__).split("\'")[1].split(".")
_widget_class_part = _widget_name_part[1].split(".")
self.widget_class = _widget_name_part[len(_widget_name_part)-1]
if pv_within_daq_group:
self.trigger_daq_int.connect(self.receive_daq_update)
self.trigger_daq.connect(self.receive_daq_update)
self.trigger_daq_str.connect(self.receive_daq_update)
elif connect_triggers:
self.trigger_monitor.connect(self.receive_monitor_dbr_time)
self.trigger_monitor_str.connect(self.receive_monitor_update)
self.trigger_monitor_int.connect(self.receive_monitor_update)
self.trigger_monitor_float.connect(self.receive_monitor_update)
self.trigger_connect.connect(self.receive_connect_update)
self.context_menu = QMenu()
self.context_menu.setObjectName("contextMenu")
self.context_menu.setWindowModality(Qt.NonModal) #ApplicationModal
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.0"):
self.context_menu.addSection("PV: {0}".format(self.pv_name))
action1 = QAction("Text Info", self)
action1.triggered.connect(self.pv_status_text)
action2 = QAction("Lookup in Archiver", self)
action2.triggered.connect(self.lookup_archiver)
action3 = QAction("Lookup in Databuffer", self)
action3.triggered.connect(self.lookup_databuffer)
action4 = QAction("Strip Chart (PShell)", self)
action4.triggered.connect(self.strip_chart)
action6 = QAction("Configure Display Parameters", self)
action6.triggered.connect(self.display_parameters)
self.context_menu.addAction(action1)
self.context_menu.addAction(action2)
self.context_menu.addAction(action3)
self.context_menu.addAction(action4)
action5 = QAction("Reconnect: {0}".format(self.pv_name), self)
action5.triggered.connect(self.reconnect_channel)
_font = QFont()
_font.setPixelSize(12)
action5.setFont(_font)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.context_menu.addSection("")
#return action6 and 5 code here eventually
self.context_menu.addAction(action6)
self.context_menu.addAction(action5)
self.pv_message_in_a_box = QMessageBox()
self.pv_message_in_a_box.setObjectName("pvinfo")
#Qt.NonModal often causes harmless QXcbConnection: XCB error: 3 (BadWindow), sequence:
#but only if the window is closed too quickly(!)
#Qt.ApplicationModal not used as it blocks input to all windows
self.pv_message_in_a_box.setWindowModality(Qt.NonModal) #Qt.ApplicationModal
self.pv_message_in_a_box.setIcon(QMessageBox.Information)
self.pv_message_in_a_box.setStandardButtons(QMessageBox.Close) # Shows QMessageBox.Close shows Close
self.pv_message_in_a_box.setDefaultButton(QMessageBox.Close) #Show OK
self.initialize()
'''
#temporary code position
if self.pv_ctrl is not None:
if self.pv_ctrl.precision > 0:
self.context_menu.addAction(action6)
self.context_menu.addAction(action5)
'''
return self
def initialize(self):
'''Initialze class attributes and connect to ca if required.'''
_handle_within_group_flag = False
if self.pv_within_daq_group:
self.handle = self.cafe.getHandleFromPVWithinGroup(
self.pv_name, self.daq_group_name)
if self.handle > 0:
self.cafe.addWidget(self.handle, self.widget)
_handle_within_group_flag = True
#Callback already invoked to emit signal here!!
_channel_info = self.cafe.getChannelInfo(self.handle)
print(self.pv_name, self.handle)
w = self.cafe.getWidgets(self.handle)
print("widget list", w)
#_channel_info.show()
self.trigger_connect.emit(
int(self.handle), str(self.pv_name),
int(_channel_info.cafeConnectionState))
#In case user is misinformed
if not _handle_within_group_flag:
self.handle = self.cafe.getHandleFromPV(self.pv_name)
if self.connect_callback is None:
self.connect_callback = self.py_connect_callback
if self.handle > 0:
#The second time round, widget is gateway rather than parent, Why is that?
self.cafe.setPyConnectCallbackFn(self.handle,
self.connect_callback)
self.cafe.addWidget(self.handle, self.widget)
_channel_info = self.cafe.getChannelInfo(self.handle)
self.trigger_connect.emit(
self.handle, self.pv_name,
int(_channel_info.cafeConnectionState))
#print("====OLD===============", self.handle, self.widget, self.parent)
else:
self.cafe.openPrepare()
self.handle = self.cafe.open(self.pv_name,
self.connect_callback)
self.cafe.addWidget(self.handle, self.widget)
self.cafe.openNowAndWait(self.timeout, self.handle)
#self.cafe.openNow()
#_channel_info = self.cafe.getChannelInfo(self.handle)
#self.trigger_connect.emit(int(self.handle), str(self.pv_name), int(_channel_info.cafeConnectionState))
#print("====NEW============ ==", self.handle, self.widget)
self.initialize_meta_data()
self.pv_message_in_a_box.setWindowTitle(self.pv_name)
def initialize_meta_data(self):
_current_value = ""
if self.cafe.isConnected(self.handle) and \
self.cafe.initCallbackComplete(self.handle):
if self.pvd is None:
self.pvd = self.cafe.getPVCache(self.handle)
if self.pv_ctrl is None:
self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
self.set_precision_and_units()
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.pv_name)
if "Not Supported" in self.pv_info.className:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else self.pv_info.className
_rtype = self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
else:
self.record_type = self.pv_info.className
#print ("record_type", self.record_type)
_current_value = self.cafe.getCache(self.handle)
if isinstance(_current_value, (int, float)):
#space for positive numbers
_value_form = ("{:<+.%sf}" % self.precision)
_current_value = _value_form.format(
round(_current_value, self.precision))
#if self.desc is None:
# self.set_desc()
#Reset
self.initialize_complete = True
#verify user input
if self.show_units is True:
if self.suffix == self.units and self.units != "":
self.show_units = False
self.suggested_text = self.prefix
if len(self.prefix) > 0:
self.suggested_text += " "
_suggested_text_from_value = " "
_max_control_abs = 0
if self.pv_ctrl is not None:
_lower_control_abs = abs(int(self.pv_ctrl.lowerControlLimit))
_upper_control_abs = abs(int(self.pv_ctrl.upperControlLimit))
_max_control_abs = max(_lower_control_abs, _upper_control_abs)
if _max_control_abs is None:
_max_control_abs = 0
_enum_list = self.pv_ctrl.enumStrings
if len(_enum_list) > 0:
_enum_list_member_max_length = 0
_enum_list_member_max_index = 0
for i in range(0, len(_enum_list)):
if len(_enum_list[i]) > _enum_list_member_max_length:
_enum_list_member_max_length = len(_enum_list[i])
_enum_list_member_max_index = i
_suggested_text_from_value += \
_enum_list[_enum_list_member_max_index] + "." #Add extra space
else:
if self.pv_ctrl.lowerControlLimit < 0:
_suggested_text_from_value += "-"
_suggested_text_from_value += str(_max_control_abs) + "."
#print("precision", self.precision, self.pv_name)
self.precision = min(9, self.precision) #safety net
for i in range (0, self.precision):
_suggested_text_from_value += "0"
if len(_current_value) > len(_suggested_text_from_value):
_suggested_text_from_value = _current_value
self.suggested_text += _suggested_text_from_value
if self.show_units:
self.suggested_text += " " + self.units
self.suggested_text += self.suffix
_suggested_text_length = len(self.suggested_text)
self.suggested_text = self.suggested_text.center(
_suggested_text_length+2)
self.max_control_abs_str = str(_max_control_abs)
_max_control_abs_length = len(self.max_control_abs_str)
_offset = 9
self.max_control_abs_str = self.max_control_abs_str.center(
_max_control_abs_length + _offset)
qsettings = QSettings()
qsettings.beginGroup("Widget")
qsettings.beginGroup(self.pv_name)
qsettings.beginGroup(self.widget_class)
#_var_base = "Widget/" + self.pv_name + "/" + self.widget_class + "/"
_var_text = "suggested_text"
_ctrl_abs = "max_control_abs_str"
if self.cafe.isConnected(self.handle) and \
self.cafe.initCallbackComplete(self.handle):
qsettings.setValue(_var_text, self.suggested_text)
qsettings.setValue(_ctrl_abs, self.max_control_abs_str)
else:
if qsettings.value(_var_text) is not None:
self.suggested_text = qsettings.value(_var_text)
if qsettings.value(_ctrl_abs) is not None:
self.max_control_abs_str = qsettings.value(_ctrl_abs)
qsettings.endGroup()
qsettings.endGroup()
qsettings.endGroup()
def is_initialize_complete(self):
icount = 0;
while not self.initialize_complete :
time.sleep(0.01)
self.initialize_meta_data()
icount += 1
if icount > 50:
return False
return True
def cleanup(self, close_pv = True):
'''Clean up the widget.'''
##Check for monitors
##QWidget::close() is basically a combination of QWidget::closeEvent(), QWidget::hide(), and QObject::deleteLater() (if Qt::WA_DeleteOnClose is set).
#Make sure mon id is valid
if self.handle > 0:
_monID_list = self.cafe.getMonitorIDs(self.handle)
if self.monitor_id in _monID_list:
self.cafe.monitorStop(self.handle, self.monitor_id)
#self.cafe.monitorStop(self.handle, self.monitor_id)
#Do not close of there are other monitors
if self.cafe.getNoMonitors(self.handle) > 0:
if close_pv is True:
self.cafe.close(self.pv_name)
self.widget.deleteLater()
def format_display_value(self, value):
if value is None:
print(self, self.pv_name, ">>>>>>>>>>>>format_display_value is None>>>>>>")
#return
if isinstance(value, str):
_value_str = value
elif isinstance(value, int):
_value_str = str(value)
else:
_value_form = ("{:< .%sf}" % self.precision) #space for positive numbers
#print("v/prec", value, self.precision, flush=True)
_rounded_value = round(value, self.precision)
#print(_rounded_value, flush=True)
_value_str = _value_form.format(_rounded_value)
#print(_value_str, flush=True)
if self.show_units:
_value_str += " " + self.units + " "
if self.suffix is not "":
_value_str += " " + self.suffix + " "
if self.prefix is not "":
_space = ""
if self.pv_ctrl is not None:
if self.pv_ctrl.lowerDisplayLimit < 0:
_space = " "
_value_str = self.prefix + _space + _value_str
return _value_str
def post_display_value(self, value):
#self.mutex_post_display.lock()
_value_str = self.format_display_value(value)
if "setText" in dir(self):
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
self.setText(_value_str)
self.blockSignals(False)
else:
#print("value =", _value_str, flush=True)
self.setText(_value_str)
else:
print("setText method does not exist for this widget class:\n", self.widget.__class__)
print("sender was: ", self.sender())
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of pv connection status.
Checks for existence of widget. Waits up to a maximun of 100 ms.
'''
#print(" py_connect_callback:: START ")
#print(" py_connect_callback:: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>", pvname)
#print(handle, pvname, status, self.cafe.getStatusCodeAsString(status))
pv_name = pvname
_widget = None
#_widgetList =self.cafe.getWidgets(handle)
#for i in range(0, len(_widgetList)):
#_widget = _widgetList[i]
#_widget.trigger_connect.emit(int(handle), str(pv_name), int(status))
#print (i, "widget at connect>>>>>>>>>>>>>>>>>>>>>>", _widget.__class__)
self.trigger_connect.emit(int(handle), str(pv_name), int(status))
#print(" py_connect_callback:: END ")
def receive_connect_update(self, handle, pv_name, status, post_display=True):
'''Triggered by connect signal. For Widget to overload.'''
#print(" receive _connect_callback:: START ")
#print ("RRReceive_connect triggered for widget", self, "with status", status)
#print ("RRReceive_connect triggered for widget", handle, pv_name, status)
_alarm_severity = None
if status == self.cyca.ICAFE_CS_CONN:
self.initialize_connect = True
self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info is not None and self.record_type is None:
if "Not Supported" in self.pv_info.className:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else self.pv_info.className
_rtype = self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
#print(self.pv_name)
#print("record type====>", self.record_type)
else:
self.record_type = self.pv_info.className
self.set_precision_and_units(reconnectFlag=True)
#THis will connect to a new channel
#if self.desc is None:
# self.set_desc()
#print("msg_lab", self.msg_label, "/", len(self.msg_label))
if self.msg_label == "":
_value = self.cafe.getCache(handle, dt='native')
#Another reconnection in progress!!!
if _value == None:
return
else:
_value = self.msg_label
#print("_value", _value, "/", len(_value))
#print("_value", _value, "/")
if post_display:
self.post_display_value(_value)
self.qt_property_reconnect()
else:
self.qt_property_disconnect()
if status == self.cyca.ICAFE_CS_CLOSED:
self.initialize_again = True
elif self.initialize_again:
#monitos_id informs whether or not widget has a monitor
#CAQMessageButton for instance does not have a monitor
if not self.pv_within_daq_group and self.monitor_id is not None:
self.monitor_start()
#print("RESTART MONITOR FOR THIS WIDGET", flush=True)
self.initialize_again = False
#print(" receive _connect_callback:: END ")
return
def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
''' DAQ mode is widget specific.
DAQ may be in BS mode, but channels within DAQ stream that
are not BS enabled will be flagged as CA Mode, i.e., CARead
'''
_current_qt_dynamic_property = self.qt_dynamic_property_get()
alarm_severity = daq_pvd.alarmSeverity
self.pvd = daq_pvd
#print("BEFORE mode, object_name, daqState", daq_mode, self.qt_object_name, daq_state)
if daq_mode != self.qt_object_name:
self.qt_object_name = daq_mode
self.setObjectName(self.qt_object_name)
self.qt_style_polish()
#print("AFTER mode, object_name, daqState", daq_mode, self.qt_object_name, daq_state)
if daq_state in (self.cyca.ICAFE_DAQ_STOPPED,):
#if daq_state in (DAQState.CA_STOP, DAQState.BS_STOP, DAQState.CA_PAUSE,
# DAQState.BS_PAUSE):
if _current_qt_dynamic_property != self.DAQ_STOPPED:
self.qt_property_daq_stopped()
elif daq_state in (self.cyca.ICAFE_DAQ_PAUSED,):
if _current_qt_dynamic_property != self.DAQ_PAUSED:
self.qt_property_daq_paused()
elif daq_state in (self.cyca.ICAFE_DAQ_RUN,):
#if _current_qt_dynamic_property != self.READBACK_ALARM:
# self.qt_property_alarm_sev_no_alarm()
#print ("before", daq_mode, _current_qt_dynamic_property)
if daq_mode == self.PV_DAQ_BS and \
_current_qt_dynamic_property != self.READBACK_STATIC:
self.qt_property_static()
elif daq_mode == self.PV_DAQ_CA:
#if _current_qt_dynamic_property not in (self.READBACK_STATIC,
# self.READBACK_ALARM,):
if self.color_mode != self.color_mode_requested:
self.color_mode == self.color_mode_requested
#print("new colore mode")
if self.cafe.isEnum(self.handle) and \
isinstance(daq_pvd.value[0], int):
_value = self.cafe.getStringFromEnum(self.handle,
daq_pvd.value[0])
else:
_value = daq_pvd.value[0]
if daq_pvd.status == self.cyca.ICAFE_NORMAL:
if self.msg_label == "":
self.post_display_value(_value)
if daq_mode == self.PV_DAQ_BS:
return
#Fro DAQ when channel connects after application start-up
#if _current_qt_dynamic_property == self.DISCONNECTED:
# self.qt_property_initial_values(qt_object_name = self.PV_READBACK)
#Check if color settings are correct
##if _current_qt_dynamic_property == self.READBACK_STATIC and \
if alarm_severity > self.cyca.SEV_NO_ALARM:
self.color_mode = self.READBACK_ALARM
self.color_mode_requested = self.READBACK_ALARM
if self.color_mode == self.READBACK_ALARM:
if alarm_severity == self.cyca.SEV_MINOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MINOR:
self.qt_property_alarm_sev_minor()
elif alarm_severity == self.cyca.SEV_MAJOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MAJOR:
self.qt_property_alarm_sev_major()
elif alarm_severity == self.cyca.SEV_INVALID:
if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
self.qt_property_alarm_sev_invalid()
elif alarm_severity == self.cyca.SEV_NO_ALARM:
if _current_qt_dynamic_property != self.ALARM_SEV_NO_ALARM:
self.qt_property_alarm_sev_no_alarm()
elif _current_qt_dynamic_property != self.READBACK_STATIC:
self.qt_property_static()
#print ("after", daq_mode, self.qt_dynamic_property_get() )
else:
if _current_qt_dynamic_property != self.DISCONNECTED:
self.qt_property_disconnect()
def receive_monitor_dbr_time(self, pvdata, alarm_severity):
print("in gateway", self.pv_name)
#pvdata.show()
def receive_monitor_update(self, value, status, alarm_severity):
'''Triggered by monitor signal. For Widget to overload.'''
self.mutex_post_display.lock()
_current_qt_dynamic_property = self.qt_dynamic_property_get()
#print(self.pv_name, value, status, alarm_severity)
#if isinstance(value, (int, float)):
# if value < 100:
# print("CURRENT PROPERY VALUE", self.pv_name, _current_qt_dynamic_property, value, status, alarm_severity )
#print ("sender //2//", self.sender(), value)
#print("receive monitor update/1", self.pv_name, self.qt_object_name, self._qt_property_selected)
if status == self.cyca.ICAFE_NORMAL:
'''
if isinstance(value, (int, float)):
if value < -20:
alarm_severity = self.cyca.SEV_INVALID
elif value < -1:
alarm_severity = self.cyca.SEV_MAJOR
elif value < 5:
alarm_severity = self.cyca.SEV_MINOR
else:
alarm_severity = self.cyca.SEV_NO_ALARM
'''
if self.msg_label == "":
self.post_display_value(value)
#For DAQ when channel connects after application start-up
if _current_qt_dynamic_property == self.DISCONNECTED:
self.qt_property_initial_values(qt_object_name = self.PV_READBACK)
#Check if color settings are correct
elif _current_qt_dynamic_property == self.READBACK_STATIC:
if alarm_severity > self.cyca.SEV_NO_ALARM and \
alarm_severity < self.cyca.SEV_INVALID:
self.color_mode = self.READBACK_ALARM
self.status_tip = "Widget color mode is dynamic, pv with alarm limits"
elif alarm_severity == self.cyca.SEV_INVALID:
if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
self.qt_property_alarm_sev_invalid()
if self.color_mode == self.READBACK_ALARM:
if alarm_severity == self.cyca.SEV_MINOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MINOR:
self.qt_property_alarm_sev_minor()
elif alarm_severity == self.cyca.SEV_MAJOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MAJOR:
self.qt_property_alarm_sev_major()
elif alarm_severity == self.cyca.SEV_INVALID:
if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
self.qt_property_alarm_sev_invalid()
elif alarm_severity == self.cyca.SEV_NO_ALARM:
if _current_qt_dynamic_property != self.ALARM_SEV_NO_ALARM:
self.qt_property_alarm_sev_no_alarm()
else:
if _current_qt_dynamic_property != self.DISCONNECTED:
self.qt_property_disconnect()
self.mutex_post_display.unlock()
#print("receive monitor update/2", self.pv_name, self.qt_object_name, self._qt_property_selected)
def py_monitor_callback(self, handle, pvname, pvdata):
'''Callback function to be invoked on change of pv value.
cafe.getCache and cafe.set operations permitted within callback.
'''
'''
if "PULSEID" in pvname:
pass
else:
print ("py_monitor_callback: name/handle ",pvname, handle )
'''
pv_name = pvname
pvd = pvdata
#print("===================================")
#print("pvname/handle in mon callback ", pv_name, handle)
if not hasattr(self, 'cafe'):
print ("py_monitor_callback: name/handle self cafe is NONE =>>>>>>>>>>>> ",
pv_name, handle)
return
#pv_name = self.cafe.getPVNameFromHandle(self.handle)
#pvd = self.cafe.getPVCache(self.handle)
self.pvd = pvd
'''
if pvname != pv_name:
print ("py_monitor_callback: name/handle/monid =>>>>>>>>>>>> ",
pv_name, handle, self.cafe.getMonitorIDInCallback(handle))
print ("PV NAME NOT THE SAME **** WIDGET in monitor callback ", self)
'''
#_pvc = self.cafe.getCtrlCache(handle)
#print("_pvc.nelem",_pvc.nelem)
#_pvc.show()
#print(_pvc.nelem)
#_pvd = self.cafe.getPVCache(handle)
#pvd.showMax(4000) #set no of elemets to 1 in pvctrlCache!
#print(pvd.nelem)
'''
_info = self.cafe.getChannelInfo(handle)
_info.show()
'''
#_widgetList =self.cafe.getWidgets(handle)
_widget = None
#print ("END monid =>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", self.cafe.getMonitorIDInCallback(handle))
#print(self, self.pv_name)
#print(_widgetList)
'''
self.mutex.lock()
for _widget, _handle in self.widget_handle_dict.items():
if _handle == handle:
'''
for i in range(0, 1): #len(_widgetList)):
#_widget = _widgetList[i]
if pvd.status == self.cyca.ICAFE_CS_NEVER_CONN:
print("initialize again")
self.initialize()
elif pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_alarm_severity = self.cyca.ICAFE_CA_OP_CONN_DOWN
#print("COMPARE ALARM SEVERITIES ", _alarm_severity, pvd.alarmSeverity)
else:
_alarm_severity = pvd.alarmSeverity
if self.monitor_dbr_time:
self.trigger_monitor.emit(pvd, _alarm_severity)
elif isinstance(pvd.value[0], str):
self.trigger_monitor_str.emit((pvd.value[0]), pvd.status, _alarm_severity) #, _widget)
#print("emitted str value", pvd.value[0])
elif isinstance(pvd.value[0], int):
self.trigger_monitor_int.emit((pvd.value[0]), pvd.status, _alarm_severity)
else:
#print(dir(self.receivers(self, self.trigger_monitor_float)))
self.trigger_monitor_float.emit(float(pvd.value[0]), pvd.status, _alarm_severity)
#print("emitted float value", pvd.value[0])
pass
#if _widget is None:
# print("NO WIDGET FOR THIS PV!!!! pv = ", pv_name)
#self.mutex.unlock()
def monitor_start(self):
'''Initiate monitor on pv.'''
#print(self, self.pv_name, "Initiate monitor on pv:", self.monitor_callback, self.py_monitor_callback)
if self.handle > 0:
#Is monitor in waiting - now deleted with monitor_stop
if self.notify_unison:
self.monitor_id = self.cafe.monitorStart(
self.handle, dbr=self.cyca.CY_DBR_TIME)
#start with gateway supplied monitor callback handler
elif self.monitor_callback is None:
self.monitor_id = self.cafe.monitorStart(
self.handle, cb=self.py_monitor_callback,
dbr=self.cyca.CY_DBR_TIME,
notify_milliseconds=self.notify_milliseconds)
else:
self.monitor_id = self.cafe.monitorStart(
self.handle, cb=self.monitor_callback,
dbr=self.cyca.CY_DBR_TIME,
notify_milliseconds=self.notify_milliseconds)
def monitor_stop(self):
#print("monitor_stopped")
if self.handle > 0:
_monID_list = self.cafe.getMonitorIDs(self.handle)
_monID_inwaiting_list = self.cafe.getMonitorIDsInWaiting(self.handle)
_monID_all = _monID_list + _monID_inwaiting_list
if self.monitor_id in _monID_all:
#print("stopping in monitor_stop for handle", self.monitor_id)
self.cafe.monitorStop(self.handle, self.monitor_id)
#print("stopped in monitor_stop for handle", self.monitor_id)
#Is monitor in waiting?
#remove monitors in waiting
def reconnect_channel(self):
self.cafe.reconnect([self.handle]) #list
def set_desc(self):
'''Set description of pv from pv.DESC'''
if self.cafe.hasDescription(self.handle):
self.desc = self.cafe.getDescription(self.handle)
return
elif self.desc is not None:
return
else:
self.cafe.supplementHandle(self.handle)
if self.cafe.hasDescription(self.handle):
self.desc = self.cafe.getDescription(self.handle)
if self.desc is not None:
return
###Back-up solution
_found = str(self.pv_name).find(".")
if _found != -1:
_pv_desc = str(self.pv_name)[0:_found] +".DESC"
else:
_pv_desc = self.pv_name +".DESC"
_handle_desc = self.cafe.getHandleFromPVName(_pv_desc)
_handle_desc_already_open = False
if _handle_desc == 0:
self.cafe.openPrepare()
_handle_desc = self.cafe.open(_pv_desc)
self.cafe.openNowAndWait(self.timeout, _handle_desc)
time.sleep(0.001)
else:
_handle_desc_already_open = True
if self.cafe.isConnected(_handle_desc):
self.desc = self.cafe.getCache(_handle_desc, 'str')
if self.desc is None:
self.desc = self.cafe.get(_handle_desc, 'str')
else:
self.desc = None
if not _handle_desc_already_open:
self.cafe.close(_handle_desc)
def set_precision_and_units(self, reconnectFlag: bool = False):
'''Set the pv precision and units.'''
if self.pv_ctrl is None or reconnectFlag is True:
self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
if self.pv_ctrl is not None:
if not self.has_precision_user:
self.precision = self.pv_ctrl.precision
if self.pv_ctrl.units is not None:
#print(self.pv_ctrl.units)
#print(type(self.pv_ctrl.units))
self.units = str(self.pv_ctrl.units)
else:
self.units = ""
if reconnectFlag is True:
#verify user input
if self.show_units is True and self.suffix is not None:
if self.suffix == self.units:
self.show_units = False
def _qt_readback_color_mode(self):
'''Color mode is determined from CAFE and depends on whether the pv:
has alarm limits (self.color_mode = 'readbackAlarm')
or is without alarm limits (self.color_mode = 'readbackStatic')
'''
#Already set by user
if self.color_mode is self.READBACK_ALARM:
return
if self.cafe.isConnected(self.handle):
pvd = self.cafe.getPVCache(self.handle)
if pvd.alarmSeverity in (self.cyca.SEV_MINOR, self.cyca.SEV_MAJOR) or \
self.cafe.hasAlarmStatusSeverity(self.handle):
self.color_mode = self.READBACK_ALARM
self.status_tip = "Widget color mode is dynamic, pv with alarm limits"
#print(self.pv_name, "has alarm svev", self.cafe.hasAlarmStatusSeverity(self.handle))
else:
self.color_mode = self.READBACK_STATIC
self.status_tip = "Widget color mode is static, pv without alarm limits"
def qt_property_initial_values(self, qt_object_name: str = None, tool_tip: bool = True):
'''Set Qt property values.'''
self.qt_object_name = qt_object_name
if tool_tip:
self.setToolTip(self.pv_name)
self.setObjectName(self.qt_object_name)
if self.qt_object_name in self.qt_object_to_property.keys():
self._qt_property_selected = copy.deepcopy(self.qt_object_to_property[self.qt_object_name])
else:
print ("qt_property_initial_values: Object not found in dictionary")
#print("qt_property_initial_values", self.qt_object_name, self._qt_property_selected)
if self.cafe.isConnected(self.handle):
if self.qt_object_name == self.PV_READBACK:
self._qt_readback_color_mode()
#self.setStatusTip(self.status_tip)
elif self.qt_object_name == self.PV_CONTROLLER:
if self.color_mode == self.ACT_ON_BEAM:
#self.setStatusTip("PV setting acts directly on beam")
pass
else:
self.color_mode = self.NOT_ACT_ON_BEAM
#self.setStatusTip("PV setting does not influence beam")
elif self.qt_object_name == self.PV_DAQ_CA:
self._qt_readback_color_mode()
elif self.qt_object_name == self.PV_DAQ_BS:
self.color_mode = self.READBACK_STATIC
#print("qt_property_initial_values//", self.pv_name, self.qt_object_name, self._qt_property_selected)
self._qt_dynamic_property_set(self.color_mode)
#print("qt_property_initial_values///", self.pv_name, self.qt_object_name, self._qt_property_selected)
else:
self.qt_property_disconnect()
#print("qt_property_initial_values", self.pv_name, self.qt_object_name, self.color_mode)
'''
meta_obj = self.metaObject()
count = meta_obj.propertyCount()
for i in range(0, count):
meta_prop = meta_obj.property(i)
name = meta_prop.name()
print(i, name, self.property(name))
'''
def qt_dynamic_property_get(self, property_state : str = None):
'''Retrieves the requested property value'''
'''else that which is currently true'''
for _property, _value in self._qt_property_selected.items(): #states.items():
if property_state is not None:
if _property == property_state:
return _value
elif _value:
#print(self, _property, "SELECTED")
return _property
def _qt_dynamic_property_set(self, property_state : str = None):
'''Set the Input property to true, and the remainder to False'''
'''If None is given then all dynamic properties are set to False'''
#print("qt_property_set/", property_state, self.pv_name, self.qt_object_name, self.color_mode)
#if property_state in self.qt_property_states.keys():
for _property, _value in self._qt_property_selected.items(): #states.items():
if _property == property_state:
self.setProperty(_property, True)
self._qt_property_selected[_property] = True
else:
self.setProperty(_property, False)
self._qt_property_selected[_property] = False
#print("qt_property_set//", self.pv_name, self.qt_object_name, self.color_mode)
#l = self.dynamicPropertyNames()
#for i in range (0, len(l)):
# print(i, l[i])
#return self._qt_property_selected
def qt_property_disconnect(self, redraw=False):
'''Set Qt disconnect property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.DISCONNECTED)
'''
if not self.initialize_complete:
self.setStatusTip("PV={0} was never connected".format(self.pv_name))
else:
self.setStatusTip("PV={0} is presently disconnected".format(self.pv_name))
'''
#print("qt_property_disconnect", self.pv_name, self.qt_object_name, self.color_mode)
self.qt_style_polish()
return #self._qt_property_selected
def qt_property_reconnect(self, redraw=False):
'''Set Qt connected property value.'''
#self.setObjectName("PyCafe")
#self.setToolTip(self.pv_name)
#l = self.dynamicPropertyNames()
#for i in range (0, len(l)-1):
# print(i, l[i])
#self.setProperty(str(l[i],'utf-8'), False)
if self.qt_object_name == self.PV_READBACK:
self._qt_readback_color_mode()
#self.setStatusTip(self.status_tip)
elif self.qt_object_name == self.PV_CONTROLLER:
if self.color_mode == self.ACT_ON_BEAM:
#self.setStatusTip("PV setting acts directly on beam")
pass
else:
self.color_mode = self.NOT_ACT_ON_BEAM
#self.setStatusTip("PV setting does not influence beam")
#self._qt_property_selected =
self._qt_dynamic_property_set(self.color_mode)
#print("qt_property_reconnect", self.pv_name, self.qt_object_name, self.color_mode)
#l = self.dynamicPropertyNames()
#for i in range (0, len(l)):
# print(i, l[i])
self.qt_style_polish()
def qt_property_alarm_sev_major(self, redraw=False):
'''Set Qt MAJOR property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.ALARM_SEV_MAJOR)
self.setStatusTip("{0} reports value in MAJOR alarm state!".format(self.pv_name))
self.qt_style_polish()
def qt_property_alarm_sev_minor(self, redraw=False):
'''Set Qt MINOR property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.ALARM_SEV_MINOR)
self.setStatusTip("{0} reports value in MINOR alarm state!".format(self.pv_name))
self.qt_style_polish()
def qt_property_alarm_sev_no_alarm(self, redraw=False):
'''Set Qt READBACK_ALARM property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.READBACK_ALARM)
self.setStatusTip("{0} reports value in normal state".format(self.pv_name))
self.qt_style_polish()
def qt_property_alarm_sev_invalid(self, redraw=False):
'''Set Qt INVALID property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.ALARM_SEV_INVALID)
self.setStatusTip("PV={0} reports an INVALID value!".format(self.pv_name))
self.qt_style_polish()
def qt_property_static(self, redraw=False):
'''Set Qt STATIC property value.'''
self._qt_dynamic_property_set(self.READBACK_STATIC)
self.setStatusTip("PV={0} does not have an alarm state".format(self.pv_name))
self.qt_style_polish()
def qt_property_daq_stopped(self, redraw=False):
'''Set Qt STOPPED property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.DAQ_STOPPED)
self.setStatusTip("PV={0} reports DAQ has stopped".format(self.pv_name))
self.qt_style_polish()
def qt_property_daq_paused(self, redraw=False):
'''Set Qt STOPPED property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.DAQ_PAUSED)
self.setStatusTip("PV={0} reports DAQ has paused".format(self.pv_name))
self.qt_style_polish()
def qt_style_polish(self, redraw=False):
if redraw:
self.style().unpolish(self)
self.style().polish(self)
event=QEvent(QEvent.StyleChange)
QApplication.sendEvent(self, event);
self.update()
self.updateGeometry()
else:
#self.style().unpolish(self)
self.style().polish(self)
QApplication.processEvents()
def pv_status_text_header(self, source="Channel Access"):
_source = source
_source_separator = "----------------------------------------"
_text = """
<p style = 'text-align: left';>
Widget: {0} ({1}, {2}) <br>
""".format(self.widget_class, self.qt_object_name, self.color_mode)
if self.msg_press_value is not None:
_text += """
<font color={1}>On press, sends value: {0} </font> <br>
""".format(self.msg_press_value, "DarkOrchid")
if self.msg_release_value is not None:
_text += """
<font color={1}>On release, sends value: {0} </font> <br>
""".format(self.msg_release_value, "DarkOrchid")
if self.pv_within_daq_group:
if self.qt_object_name in (self.PV_DAQ_BS,):
_ds_color = "Navy Blue"
else:
_ds_color = "Black"
else:
_ds_color = "Black"
_text += """
{0} <br> <font color={4}>
Data source: {1} </font><br>
{0} <br>
PV: <font color={3}>{2}</font>
""".format(_source_separator, _source, self.pv_name, "DarkOrchid",
_ds_color)
if self.desc is None:
self.set_desc()
if self.desc == "":
_text += """</p>
"""
return _text
_text += """
<br>
Description:<font color={7}> {6} </font>
</p>
""".format(self.widget_class, self.qt_object_name, \
self.color_mode, _source_separator, _source, \
self.pv_name, self.desc, "DarkOrchid"
)
return _text
def pv_status_text_enum(self):
_val_enum = None
_value = self.pvd.value[0]
if isinstance(_value, str):
_val_enum = self.cafe.getEnumFromString(self.handle, _value)
elif _value is not None:
_val_enum = self.cafe.getStringFromEnum(self.handle, _value)
_color = "Blue"
#To catch case where channel is called by user
#To catch DAQ case
if self.pv_within_daq_group:
if self.qt_object_name in (self.PV_DAQ_BS):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_color = "White"
elif self.qt_object_name in (self.PV_DAQ_CA):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_color = "White"
elif not self.cafe.isConnected(self.handle):
_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_color = "White"
_text = """
<p style = 'text-align: left';>
Value: <font color={0}>{1} [{2}]</font> <br>
""".format(_color, _value , _val_enum
)
return _text
def pv_status_text_data(self):
_value_str = ""
_first_end = 9
_end_range = min(self.pvd.nelem, _first_end)
if _end_range > 1:
_value_str = "[ "
for i in range (0, _end_range):
_value = self.pvd.value[i]
if _value is None:
_value = '0'
if isinstance(_value, str):
_value_str += _value
elif isinstance(_value, int):
_value_str += str(_value)
else:
if self.pv_ctrl is not None:
_value_form = ("{:<.%sf}" % self.pv_ctrl.precision)
_value_str += _value_form.format(
round(_value, self.pv_ctrl.precision))
if i < (_end_range-1):
_value_str += " "
if self.pvd.nelem > _first_end:
_value_str += " ... "
_value = self.pvd.value[self.pvd.nelem-1]
if isinstance(_value, str):
_value_str += _value
elif isinstance(_value, int):
_value_str += str(value)
else:
if self.pv_ctrl is not None:
_value_form = ("{:<.%sf}" % self.pv_ctrl.precision)
_value_str += _value_form.format(
round(_value, self.pv_ctrl.precision))
_value_str += " "
if _end_range > 1:
_value_str += "]"
_color = "Blue"
#To catch DAQ case
if self.pv_within_daq_group:
if self.qt_object_name in (self.PV_DAQ_BS):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_color = "White"
elif self.qt_object_name in (self.PV_DAQ_CA):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_color = "White"
elif not self.cafe.isConnected(self.handle):
_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_color = "White"
_text = """
<p style = 'text-align: left';>
Value: <font color={0}>{1} {2}</font> <br>
""".format(_color, _value_str, self.units)
return _text
def pv_status_text_timestamp(self):
_status_not_ok_color = "IndianRed"
_status_ok_color = "DimGray"
_ts_color = "Blue"
_color = _status_ok_color
#To catch DAQ case
if self.pv_within_daq_group:
if self.qt_object_name in (self.PV_DAQ_BS):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_ts_color = "White"
_color = "White"
elif self.qt_object_name in (self.PV_DAQ_CA):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_ts_color = "White"
_color = "White"
elif not self.cafe.isConnected(self.handle):
_ts_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_ts_color = "White"
if self.pvd.status != self.cyca.ICAFE_NORMAL:
_color = _status_not_ok_color
_text = """
Timestamp: <font color={0}>{2}</font> <br>
Status: <font color={1}>{3} <br> {4} </font>
""".format( _ts_color, _color, self.pvd.tsDateAsString, \
self.pvd.statusAsString, \
self.cafe.getStatusInfo(self.pvd.status))
return _text
def pv_status_text_alarm(self):
_text ="""
"""
_color = "DimGray"
#To catch DAQ case
if self.pv_within_daq_group:
if self.pvd.alarmSeverity == self.cyca.SEV_MINOR:
_color = "Yellow"
elif self.pvd.alarmSeverity == self.cyca.SEV_MAJOR:
_color = "Red"
elif self.pvd.alarmSeverity == self.cyca.SEV_INVALID:
_color = "White"
if self.qt_object_name in (self.PV_DAQ_BS):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_color = "White"
elif self.qt_object_name in (self.PV_DAQ_CA):
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_color = "White"
elif not self.cafe.isConnected(self.handle):
_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_color = "White"
elif self.pvd.alarmSeverity == self.cyca.SEV_MINOR:
_color = "Yellow"
elif self.pvd.alarmSeverity == self.cyca.SEV_MAJOR:
_color = "Red"
elif self.pvd.alarmSeverity == self.cyca.SEV_INVALID:
_color = "White"
_text += """ <br>
Alarm status: <font color={0}>{1}</font> <br>
Alarm severity: <font color={0}>{2}</font>
""".format(_color, self.pvd.alarmStatusAsString,
self.pvd.alarmSeverityAsString)
return _text
def pv_access(self):
_accessIs = ""
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info.accessRead:
_accessIs += "Read"
if self.pv_info.accessWrite:
_accessIs += "Write"
return _accessIs
def pv_status_text_enum_metadata(self):
_text = """ </p> <p>
ENUM strings: <font color={1}>{2}</font> <br><br>
Data type (native):<font color={0}> {3} </font><br>
Record type: <font color={0}> {4} </font><br>
RW Access:<font color={0}> {5} </font><br>
IOC: <font color={0}> {6} </font></p>
""".format( "MediumBlue", "DarkOrchid", self.pvc.enumStrings,
self.pv_info.dataTypeAsString,
self.record_type, self.pv_access(),
self.pv_info.hostName)
return _text
def pv_status_text_metadata(self):
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info is not None and self.record_type is None:
if "Not Supported" in self.pv_info.className:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else self.pv_info.className
self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
else:
self.record_type = self.pv_info.className
if self.record_type in ["stringin", "stringout"]:
_text = """ </p> <p>
Data type (native):<font color={0}> {3} </font><br>
Record type: <font color={0}> {4} </font><br>
RW Access:<font color={0}> {5} </font><br>
IOC: <font color={0}> {6} </font></p>
""".format("MediumBlue", self.pvd.nelem, self.pvc.precision,
self.pv_info.dataTypeAsString,
self.record_type, self.pv_access(),
self.pv_info.hostName)
return _text
_text = """ </p> <p>
"""
if self.pvd.nelem > 1:
_text += """
Nelem: <font color={0}> {1} </font><br>
""".format("MediumBlue", self.pvd.nelem)
_text += """
Precision (PV): <font color={0}>{1}</font> <br>
Data type (native):<font color={0}> {2} </font><br>
Record type: <font color={0}> {3} </font><br>
RW Access:<font color={0}> {4} </font><br>
IOC: <font color={0}> {5} </font></p>
""".format("MediumBlue", self.pvc.precision,
self.pv_info.dataTypeAsString,
self.record_type, self.pv_access(),
self.pv_info.hostName)
return _text
def pv_status_text_alarm_limits(self, ):
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info is not None and self.record_type is None:
if "Not Supported" in self.pv_info.className:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else self.pv_info.className
self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
else:
self.record_type = self.pv_info.className
_text ="""
"""
#No all record types have alarms
#className is not supported at psi since introduction of the linux ca gateway
#Not Supported by Gateway
#self.pv_info.show()
#self.pv_ctrl.show()
#print(self.record_type)
#print(self._alarm_severity_record_types)
if "Not Supported" in str(self.record_type):
pass
elif self.record_type not in self._alarm_severity_record_types:
return _text
if self.pvc.lowerAlarmLimit == 0 and self.pvc.upperAlarmLimit == 0 and \
self.pvc.lowerWarningLimit == 0 and self.pvc.upperWarningLimit == 0:
return _text
if self.cafe.hasAlarmStatusSeverity(self.handle): # or "Not Supported" in self.pv_info.className:
_text = """ <p>
Lower/Upper <em>alarm</em> limit:&nbsp;&nbsp;&nbsp; <font color={0}>{1} &nbsp;/&nbsp; {4}</font> <br>
Lower/Upper warning limit: <font color={0}>{2} &nbsp;/&nbsp; {3} </font>
</p>
""".format("MediumBlue",
self.pvc.lowerAlarmLimit, self.pvc.lowerWarningLimit,
self.pvc.upperWarningLimit, self.pvc.upperAlarmLimit)
return _text
def pv_status_text_display_limits(self):
_text ="""
"""
if self.pvc.lowerDisplayLimit == 0 and self.pvc.upperDisplayLimit == 0 and \
self.pvc.lowerControlLimit == 0 and self.pvc.upperControlLimit == 0:
return _text
_text = """ <p>
Lower/Upper <em>control</em> limit: <font color={0}>{3} &nbsp;/&nbsp; {4} </font> <br>
Lower/Upper display limit: <font color={0}> {1} &nbsp;/&nbsp; {2} </font>
</p>
""".format("MediumBlue",
self.pvc.lowerDisplayLimit, self.pvc.upperDisplayLimit,
self.pvc.lowerControlLimit, self.pvc.upperControlLimit)
return _text
def pv_status_text(self):
'''pv metadata to accompany widget's dialog box.'''
QApplication.processEvents()
_source = "Channel Access"
if self.pv_within_daq_group:
if self.qt_object_name == self.PV_DAQ_BS:
_source = "DAQ (Beam Synchronous)"
#self.pvd written to in receive_daq_update
elif self.qt_object_name == self.PV_DAQ_CA:
_source = "DAQ (Channel Access)"
self.pvd = self.cafe.getPVCache(self.handle)
if self.pvd.pulseID > 0:
_source += "<br>Pulse ID: {0}".format(self.pvd.pulseID)
else:
self.pvd = self.cafe.getPVCache(self.handle)
##For testing...
##self.pvd.status = self.cyca.ICAFE_CA_OP_CONN_DOWN
##self.pvd.statusAsString = 'ICAFE_CA_OP_CONN_DOWN'
#i, pvd, pvc = self.cafe.getChannelDataStore(self.handle)
self.pvc = self.cafe.getCtrlCache(self.handle)
#self.pvc.show()
_text_data ="""
"""
if self.pvd.status == self.cyca.ECAFE_INVALID_HANDLE:
_text_data = """<p> Status: <font color={0}>{1} <br> {2}</font> </p>
""".format("Blue", "Channel closed while DAQ in STOP state.",
"PV info requires DAQ to be in RUN/PAUSED state" )
elif self.pvd.status == self.cyca.ICAFE_CS_NEVER_CONN:
_text_data = """<p> Status: <font color={0}>{1} <br> {2}</font> </p>
""".format("Red", self.pvd.statusAsString, self.cafe.getStatusInfo(self.pvd.status))
elif self.pvc.noEnumStrings > 0:
_text_data = self.pv_status_text_enum() + \
self.pv_status_text_timestamp() + \
self.pv_status_text_alarm() + \
self.pv_status_text_enum_metadata()
else:
_text_data = self.pv_status_text_data()+ \
self.pv_status_text_timestamp() + \
self.pv_status_text_alarm() + \
self.pv_status_text_metadata() + \
self.pv_status_text_alarm_limits() + \
self.pv_status_text_display_limits()
self.pv_message_in_a_box.setText(
self.pv_status_text_header(source=_source) + _text_data
)
QApplication.processEvents()
self.pv_message_in_a_box.exec()
def lookup_archiver(self):
'''Plot pvdata from archiver.'''
#"https://ui-data-api.psi.ch/prepare?channel = sf-archiverappliance/"
urlIs = self.settings.urlArchiver
urlIs = urlIs + self.pv_name
if not QDesktopServices.openUrl(QUrl(urlIs)): #, QUrl.TolerantMode)
if showMessage is not None:
self.showMessage(MsgSeverity.ERROR, __pymodule__, _line(),
"Failed to open URL {0}".format(urlIs))
def lookup_databuffer(self):
'''Plot beam synchronous pvdata from databuffer.'''
#""https://ui-data-api.psi.ch/prepare?channel = sf-databuffer/"
urlIs = self.settings.urlDatabuffer
urlIs = urlIs + self.pv_name
if not QDesktopServices.openUrl(QUrl(urlIs)): #, QUrl.TolerantMode)
if showMessage is not None:
self.showMessage(MsgSeverity.ERROR, __pymodule__, _line(),
"Failed to open URL {0}".format(urlIs))
QApplication.processEvents()
def strip_chart(self):
'''PShell strip chart.'''
configStr = "-config = [[[true,\"" + self.pv_name + "\",\"Channel\",1,1]]]"
commandStr = "/sf/op/bin/strip_chart"
argStr = ["-nlaf", "-start", configStr, "&"]
QProcess.startDetached(commandStr, argStr)
def display_parameters(self):
display_wgt = QDialog(self)
_rect = display_wgt.geometry() #get current geometry of help window
_parentRect = self.context_menu.geometry() # QRect(100, 1000, 640, 480) #get current geometry of this window
#print(_rect, _parentRect)
_rect.moveTo(display_wgt.mapToGlobal(
QPoint(_parentRect.x() + _parentRect.width() - _rect.width(),
_parentRect.y())))
display_wgt.setGeometry(_rect)
#This has no effect
#display_wgt.setWindowModality(Qt.WindowModal) #Qt.ApplicationModal Qt.ApplicationModal
display_wgt.setWindowTitle(self.pv_name)
layout = QVBoxLayout()
#print("sender==================>", self.sender(), self)
#self.the_gw = self
#print("getNativeDataType", self.cafe.getDataTypeNative(self.handle))
precision_flag = True
if self.pv_ctrl is not None:
if self.pv_ctrl.precision <= 0:
precision_flag = False
if self.cafe.getDataTypeNative(self.handle) in (
self.cyca.CY_DBR_FLOAT, self.cyca.CY_DBR_DOUBLE) and precision_flag:
#precision user
_hbox_wgt = QWidget()
_hbox = QHBoxLayout()
precision_user_label = QLabel("Precision (user):")
self.precision_user_wgt = QSpinBox(self)
self.precision_user_wgt.setFocusPolicy(Qt.NoFocus)
self.precision_user_wgt.setValue(int(self.precision))
if self.pv_ctrl is not None:
_max = self.pv_ctrl.precision
else:
_max = 6
self.precision_user_wgt.setMaximum(_max)
self.precision_user_wgt.valueChanged.connect(
self.precision_user_changed)
_hbox.addWidget(precision_user_label)
_hbox.addWidget(self.precision_user_wgt)
_hbox_wgt.setLayout(_hbox)
precision_user_label.setFixedWidth(110)
self.precision_user_wgt.setFixedWidth(35)
_hbox_wgt.setFixedWidth(160)
#precision ioc
_hbox2_wgt = QWidget()
_hbox2 = QHBoxLayout()
precision_ioc_label = QLabel("Precision (ioc): ")
precision_ioc = QPushButton(self)
precision_ioc.setText(" {} ".format(_max))
precision_ioc.clicked.connect(self.precision_ioc_reset)
_hbox2.addWidget(precision_ioc_label)
_hbox2.addWidget(precision_ioc)
_hbox2_wgt.setLayout(_hbox2)
precision_ioc_label.setFixedWidth(110)
precision_ioc.setFixedWidth(20)
_hbox2_wgt.setFixedWidth(145)
layout.addWidget(_hbox_wgt)
layout.addWidget(_hbox2_wgt)
#precision refresh rate
_hbox3_wgt = QWidget()
_hbox3 = QHBoxLayout()
refresh_freq_label = QLabel("Refresh rate: ")
_default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \
self.notify_freq_hz_default
self.refresh_freq_combox_idx_dict = {0:0, 1:10, 2:5, 3:2, 4:1, 5:0.5,
6:_default_refresh_val}
refresh_freq = QComboBox(self)
refresh_freq.addItem('direct')
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[1]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[2]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[3]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[4]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[5]))
_default_text = 'default (direct)' if _default_refresh_val == 0 else \
'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6])
refresh_freq.addItem(_default_text)
for key, value in self.refresh_freq_combox_idx_dict.items():
if value == self.notify_freq_hz:
refresh_freq.setCurrentIndex(key)
break
refresh_freq.currentIndexChanged.connect(self.refresh_rate_changed)
_hbox3.addWidget(refresh_freq_label)
_hbox3.addWidget(refresh_freq)
_hbox3_wgt.setLayout(_hbox3)
refresh_freq_label.setFixedWidth(110)
refresh_freq.setFixedWidth(115)
_hbox3_wgt.setFixedWidth(235)
layout.addWidget(_hbox3_wgt)
layout.setAlignment(Qt.AlignLeft)
layout.setContentsMargins(10, 0, 0, 0)
layout.setSpacing(0)
display_wgt.setMinimumWidth(340)
display_wgt.setLayout(layout)
display_wgt.exec()
QApplication.processEvents()
def precision_ioc_reset(self):
if self.pv_ctrl is not None:
self.precision_user = self.pv_ctrl.precision
self.precision = self.pv_ctrl.precision
if self.precision is not None:
self.precision_user_wgt.setValue(self.precision)
#self.precision_user_changed(self.precision)
#_value = self.cafe.getCache(self.handle)
#self.trigger_monitor_float.emit(_value, 1, 0)
def precision_user_changed(self, new_value):
self.precision_user = new_value
self.precision = new_value
_pvd = self.cafe.getPVCache(self.handle)
if _pvd.value[0] is not None:
if isinstance(_pvd.value[0], float):
self.trigger_monitor_float.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
'''
_value = self.cafe.getCache(self.handle)
#print("widget", self.widget, self.widget.sender())
if _value is not None:
#self.post_display_value(_value)
self.trigger_monitor_float.emit(_value, 1, 0)
'''
def refresh_rate_changed(self, new_idx):
_notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx]
self.notify_milliseconds = 0 if _notify_freq_hz == 0 else \
1000 / _notify_freq_hz
self.notify_freq_hz = _notify_freq_hz
if self.notify_unison:
self.notify_unison = False
self.monitor_stop()
self.monitor_start()
else:
self.cafe.updateMonitorPolicyDeltaMS(self.handle,
self.monitor_id,
self.notify_milliseconds)
#https://doc.qt.io/qt-5.9/qtwidgets-mainwindows-menus-example.html
#Since Qt5 this has to be implemented in order to avoid the Select All dialogue button appearing..
def contextMenuEvent(self, event):
return
def showContextMenu(self):
self.context_menu.exec(QCursor.pos())
def mousePressEvent(self, event):
'''Action on mouse press event.'''
button = event.button()
if button == Qt.RightButton:
#contextMenu.exec(event.globalPos())
self.context_menu.exec(QCursor.pos())
self.clearFocus()
def mouseReleaseEvent(self, event):
event.ignore()