Files
caqtwidgets/pvgateway.py

1710 lines
63 KiB
Python

"""
The module provides data and metadata of a process variable through
PyCafe.
"""
__author__ = 'Jan T. M. Chrin'
import copy
from enum import IntEnum
import inspect
import time
from distutils.version import LooseVersion
from qtpy.QtCore import (QEvent, QMutex, QPoint, QProcess, QSettings, Qt, QUrl,
Signal)
from qtpy.QtCore import __version__ as QT_VERSION_STR
from qtpy.QtGui import QCursor, QDesktopServices, QFont
from qtpy.QtWidgets import (QAction, QApplication, QComboBox, QDialog,
QHBoxLayout, QLabel, QMenu, QMessageBox,
QPushButton, QSpinBox, QVBoxLayout, QWidget)
def __LINE__():
return inspect.currentframe().f_back_f_lineno
class DAQState(IntEnum):
BS = 10
CA = 20
BS_STOP = 30
CA_STOP = 40
BS_PAUSE = 50
CA_PAUSE = 60
class PVGateway(QWidget):
"""Retrieves pv metadata through PyCafe.
The PVGateway class when subclassed by Qt widgets enables their
connectivity to channel access.
Attributes:
monid: (int) Monitor id
units : (str) Units associated with the pv
trigger_monitor_<datatype> is the signal triggered by updates
arising from monitored pvs.
trigger_connect is the signal triggered from changes in pv
connection status.
widget_handle_dict is a dictionary mapping widgets to their pv
handle.
A pv handle may be associated to more than one widget.
"""
trigger_monitor_float = Signal(float, int, int)
trigger_monitor_int = Signal(int, int, int)
trigger_monitor_str = Signal(str, int, int)
trigger_monitor = Signal(object, int) #pvdata, status
trigger_connect = Signal(int, str, int)
trigger_daq = Signal(object, str, int)
trigger_daq_int = Signal(object, str, int)
trigger_daq_str = Signal(object, str, int)
#Properties, user supplied
ACT_ON_BEAM = 'actOnBeam'
NOT_ACT_ON_BEAM = 'notActOnBeam'
READBACK_ALARM = 'alarm'
READBACK_STATIC = 'static'
#Properties, dynamic
DISCONNECTED = 'disconnected'
ALARM_SEV_MINOR = 'alarmSevMinor'
ALARM_SEV_MAJOR = 'alarmSevMajor'
ALARM_SEV_INVALID = 'alarmSevInvalid'
ALARM_SEV_NO_ALARM = READBACK_ALARM
DAQ_STOPPED = 'stopped'
DAQ_PAUSED = 'paused'
#ObjectName, defined by CAQ<Widget>
PV_CONTROLLER = "Controller"
PV_READBACK = "Readback"
PV_DAQ_BS = "BSRead"
PV_DAQ_CA = "CARead"
_DAQ_CAFE_SG_NAME = "gBS2CA"
_alarm_severity_record_types = ["ai", "ao", "calc", "calcout", "dfanout",
"longin", "longout", "pid", "sel",
"steppermotor", "sub"]
#parent is Gui
def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
pv_within_daq_group: bool = False, color_mode=None,
show_units: bool = False, prefix: str = "", suffix: str = "",
connect_callback=None, msg_label: str = "",
connect_triggers: bool = True, notify_freq_hz: int = 0,
notify_unison: bool = False, precision: int = 0,
monitor_dbr_time: bool = False):
super().__init__()
if parent is None:
return
if not pv_name:
return
self.connect_callback = connect_callback
self.notify_freq_hz = abs(notify_freq_hz)
self.notify_freq_hz_default = self.notify_freq_hz
self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \
1000 / self.notify_freq_hz
self.notify_unison = bool(notify_unison) and bool(self.notify_freq_hz)
self.parent = parent
self.settings = self.parent.settings
self.pv_name = pv_name
self.color_mode = None
self.check_rtyp = False
if color_mode is not None:
if color_mode in (self.ACT_ON_BEAM,
self.NOT_ACT_ON_BEAM,
self.READBACK_ALARM,
self.READBACK_STATIC):
self.color_mode = color_mode
self.color_mode_requested = self.color_mode
#if 'READ' in self.pv_name:
#print('color mode',self.pv_name, self.color_mode,
#self.color_mode_requested )
if monitor_callback is not None:
self.monitor_callback = monitor_callback
else:
self.monitor_callback = None
self.pv_within_daq_group = pv_within_daq_group
self.show_units = show_units
self.prefix = prefix
self.suffix = suffix
self.cafe = self.parent.cafe
self.cyca = self.parent.cyca
self.url_archiver = None
self.url_databuffer = None
if self.parent.settings is not None:
self.url_archiver = self.parent.settings.data["url"]["archiver"]
if "databuffer" in self.parent.settings.data["url"]:
self.url_databuffer = self.parent.settings.data["url"]["databuffer"]
self.bg_readback = self.parent.settings.data["StyleGuide"][
"bgReadback"]
self.fg_alarm_major = self.parent.settings.data["StyleGuide"][
"fgAlarmMajor"]
self.fg_alarm_minor = self.parent.settings.data["StyleGuide"][
"fgAlarmMinor"]
self.fg_alarm_invalid = self.parent.settings.data["StyleGuide"][
"fgAlarmInvalid"]
self.fg_alarm_noalarm = self.parent.settings.data["StyleGuide"][
"fgAlarmNoAlarm"]
else:
#self.settings = ReadJSON(self.parent.appname)
self.url_archiver = ("https://data-ui.psi.ch/preselect?c1=" +
"sf-archiverappliance/")
self.url_databuffer \
= "https://data-ui.psi.ch/preselect?c1=sf-databuffer/"
self.daq_group_name = self._DAQ_CAFE_SG_NAME
self.desc = None
self.handle = None
self.initialize_complete = False
self.initialize_again = False
self.msg_label = msg_label
self.msg_press_value = None
self.msg_release_value = None
self.monitor_id = None
self.monitor_dbr_time = monitor_dbr_time
self.mutex_post_display = QMutex()
self.mutex = QMutex()
self.precision_user = precision
self.has_precision_user = bool(precision)
self.precision_pv = 3
self.precision = (self.precision_user if self.has_precision_user else
self.precision_pv)
self.pvd = None
self.pv_ctrl = None
self.pv_info = None
self.record_type = None
#if 'show_log_message' in dir(self.parent):
# self.show_log_message = self.parent.show_log_message
#else:
# self.show_log_message = None
self.qt_object_name = None
self.qt_property_controller = {
self.DISCONNECTED: False,
self.ACT_ON_BEAM: False, self.NOT_ACT_ON_BEAM: False
}
self.qt_property_readback = {
self.DISCONNECTED: False,
self.READBACK_ALARM: False, self.READBACK_STATIC: False,
self.ALARM_SEV_MINOR: False, self.ALARM_SEV_MAJOR: False,
self.ALARM_SEV_INVALID: False
}
self.qt_property_daq_bs = {
self.DISCONNECTED: False,
self.READBACK_ALARM: False, self.READBACK_STATIC: False,
self.ALARM_SEV_MINOR: False, self.ALARM_SEV_MAJOR: False,
self.ALARM_SEV_INVALID: False,
self.DAQ_STOPPED: False, self.DAQ_PAUSED: False
}
self.qt_property_daq_ca = {
self.DISCONNECTED: False,
self.READBACK_ALARM: False, self.READBACK_STATIC: False,
self.ALARM_SEV_MINOR: False, self.ALARM_SEV_MAJOR: False,
self.ALARM_SEV_INVALID: False,
self.DAQ_STOPPED: False, self.DAQ_PAUSED: False
}
self.qt_object_to_property = {
self.PV_CONTROLLER: self.qt_property_controller,
self.PV_READBACK: self.qt_property_readback,
self.PV_DAQ_BS: self.qt_property_daq_bs,
self.PV_DAQ_CA: self.qt_property_daq_ca
}
self._qt_property_selected = {}
self.status_tip = None
self.suggested_text = ""
self.time_monotonic = time.monotonic()
self.pvd_previous = None
self.timeout = 0.2
self.units = ""
self.widget = self
_widget_name_part = str(self.widget.__class__).split("\'")[1].split(".")
#_widget_class_part = _widget_name_part[1].split(".")
self.widget_class = _widget_name_part[len(_widget_name_part)-1]
if pv_within_daq_group:
self.trigger_daq_int.connect(self.receive_daq_update)
self.trigger_daq.connect(self.receive_daq_update)
self.trigger_daq_str.connect(self.receive_daq_update)
elif connect_triggers:
self.trigger_monitor.connect(self.receive_monitor_dbr_time)
self.trigger_monitor_str.connect(self.receive_monitor_update)
self.trigger_monitor_int.connect(self.receive_monitor_update)
self.trigger_monitor_float.connect(self.receive_monitor_update)
self.trigger_connect.connect(self.receive_connect_update)
self.context_menu = QMenu()
self.context_menu.setObjectName("contextMenu")
self.context_menu.setWindowModality(Qt.NonModal) #ApplicationModal
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.0"):
self.context_menu.addSection("PV: {0}".format(self.pv_name))
action1 = QAction("Text Info", self)
action1.triggered.connect(self.pv_status_text)
self.context_menu.addAction(action1)
action2 = QAction("Lookup in Archiver", self)
action2.triggered.connect(self.lookup_archiver)
self.context_menu.addAction(action2)
if self.url_databuffer is not None:
action3 = QAction("Lookup in Databuffer", self)
action3.triggered.connect(self.lookup_databuffer)
self.context_menu.addAction(action3)
action4 = QAction("Strip Chart (PShell)", self)
action4.triggered.connect(self.strip_chart)
self.context_menu.addAction(action4)
action6 = QAction("Configure Display Parameters", self)
action6.triggered.connect(self.display_parameters)
action5 = QAction("Reconnect: {0}".format(self.pv_name), self)
action5.triggered.connect(self.reconnect_channel)
_font = QFont()
_font.setPixelSize(12)
action5.setFont(_font)
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.context_menu.addSection("")
#return action6 and 5 code here eventually
self.context_menu.addAction(action6)
self.context_menu.addAction(action5)
self.pv_message_in_a_box = QMessageBox()
self.pv_message_in_a_box.setObjectName("pvinfo")
#Qt.ApplicationModal not used as it blocks input to all windows
self.pv_message_in_a_box.setWindowModality(Qt.NonModal)
self.pv_message_in_a_box.setIcon(QMessageBox.Information)
self.pv_message_in_a_box.setStandardButtons(QMessageBox.Close)
self.pv_message_in_a_box.setDefaultButton(QMessageBox.Close)
self.initialize()
#The __init__ method of a class is used to initialize new objects,
#not create them. As such, it should not return any value.
return #self # used by pvgateway in CAQStripChart
def initialize(self):
'''Initialze class attributes and connect to ca if required.'''
_handle_within_group_flag = False
if self.pv_within_daq_group:
self.handle = self.cafe.getHandleFromPVWithinGroup(
self.pv_name, self.daq_group_name)
if self.handle > 0:
self.cafe.addWidget(self.handle, self.widget)
_handle_within_group_flag = True
#Callback already invoked to emit signal here!!
_channel_info = self.cafe.getChannelInfo(self.handle)
#wgts = self.cafe.getWidgets(self.handle)
self.trigger_connect.emit(
int(self.handle), str(self.pv_name),
int(_channel_info.cafeConnectionState))
#In case user is misinformed
if not _handle_within_group_flag:
self.handle = self.cafe.getHandleFromPV(self.pv_name)
if self.connect_callback is None:
self.connect_callback = self.py_connect_callback
if self.handle > 0:
#The second time round, widget is gateway rather than parent,
#Why is that?
self.cafe.setPyConnectCallbackFn(self.handle,
self.connect_callback)
self.cafe.addWidget(self.handle, self.widget)
_channel_info = self.cafe.getChannelInfo(self.handle)
self.trigger_connect.emit(
self.handle, self.pv_name,
int(_channel_info.cafeConnectionState))
else:
self.cafe.openPrepare()
self.handle = self.cafe.open(self.pv_name,
self.connect_callback)
self.cafe.addWidget(self.handle, self.widget)
self.cafe.openNowAndWait(self.timeout, self.handle)
self.initialize_meta_data()
self.pv_message_in_a_box.setWindowTitle(self.pv_name)
def initialize_meta_data(self):
_current_value = ""
if self.cafe.isConnected(self.handle) and \
self.cafe.initCallbackComplete(self.handle):
if self.pvd is None:
self.pvd = self.cafe.getPVCache(self.handle)
if self.pv_ctrl is None:
self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
self.set_precision_and_units()
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.pv_name)
if "Not Supported" in self.pv_info.className and self.check_rtyp:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else \
self.pv_info.className
_rtype = self.cafe.close(self.pv_name.split(".")[0] +
".RTYP")
else:
self.record_type = self.pv_info.className
_current_value = self.cafe.getCache(self.handle)
if isinstance(_current_value, (int, float)):
#space for positive numbers
_value_form = ("{:<+.%sf}" % self.precision)
_current_value = _value_form.format(
round(_current_value, self.precision))
#Reset
self.initialize_complete = True
#verify user input
if self.show_units is True:
if self.suffix == self.units and self.units != "":
self.show_units = False
self.suggested_text = self.prefix
if self.prefix:
self.suggested_text += " "
_suggested_text_from_value = " "
_max_control_abs = 0
if self.pv_ctrl is not None:
#DisplayLimit preferred over ControlLimit as latter n/a for ao
_lower_control_abs = abs(int(self.pv_ctrl.lowerDisplayLimit))
_upper_control_abs = abs(int(self.pv_ctrl.upperDisplayLimit))
_max_control_abs = max(_lower_control_abs, _upper_control_abs)
if _max_control_abs is None:
_max_control_abs = 0
_enum_list = self.pv_ctrl.enumStrings
if _enum_list:
_enum_list_member_max_length = 0
_enum_list_member_max_index = 0
for i in range(0, len(_enum_list)):
if len(_enum_list[i]) > _enum_list_member_max_length:
_enum_list_member_max_length = len(_enum_list[i])
_enum_list_member_max_index = i
_suggested_text_from_value += \
_enum_list[_enum_list_member_max_index] + "."
else:
if self.pv_ctrl.lowerControlLimit < 0:
_suggested_text_from_value += "-"
_suggested_text_from_value += str(_max_control_abs) + "."
self.precision = min(9, self.precision) #safety net
for i in range(0, self.precision):
_suggested_text_from_value += "0"
if len(_current_value) > len(_suggested_text_from_value):
_suggested_text_from_value = _current_value
self.suggested_text += _suggested_text_from_value
if self.show_units:
self.suggested_text += " " + self.units
self.suggested_text += self.suffix
_suggested_text_length = len(self.suggested_text)
self.suggested_text = self.suggested_text.center(
_suggested_text_length+2)
self.max_control_abs_str = str(_max_control_abs)
_max_control_abs_length = len(self.max_control_abs_str)
_offset = 9
self.max_control_abs_str = self.max_control_abs_str.center(
_max_control_abs_length + _offset)
qsettings = QSettings()
qsettings.beginGroup("Widget")
qsettings.beginGroup(self.pv_name)
qsettings.beginGroup(self.widget_class)
_var_text = "suggested_text"
_ctrl_abs = "max_control_abs_str"
if self.cafe.isConnected(self.handle) and \
self.cafe.initCallbackComplete(self.handle):
qsettings.setValue(_var_text, self.suggested_text)
qsettings.setValue(_ctrl_abs, self.max_control_abs_str)
else:
if qsettings.value(_var_text) is not None:
self.suggested_text = qsettings.value(_var_text)
if qsettings.value(_ctrl_abs) is not None:
self.max_control_abs_str = qsettings.value(_ctrl_abs)
qsettings.endGroup()
qsettings.endGroup()
qsettings.endGroup()
def is_initialize_complete(self):
icount = 0
while not self.initialize_complete:
time.sleep(0.01)
self.initialize_meta_data()
icount += 1
if icount > 5: #50
return False
return True
def cleanup(self, close_pv=True):
'''Clean up the widget.'''
#Make sure mon id is valid
if self.handle > 0:
_monID_list = self.cafe.getMonitorIDs(self.handle)
if self.monitor_id in _monID_list:
self.cafe.monitorStop(self.handle, self.monitor_id)
#Do not close of there are other monitors
if self.cafe.getNoMonitors(self.handle) > 0:
if close_pv is True:
self.cafe.close(self.pv_name)
self.widget.deleteLater()
def format_display_value(self, value):
if value is None:
print(self, self.pv_name, ">>>>format_display_value is None")
#return
if isinstance(value, str):
_value_str = value
elif isinstance(value, int):
_value_str = str(value)
else:
_value_form = ("{:< .%sf}" % self.precision)
_rounded_value = round(value, self.precision)
_value_str = _value_form.format(_rounded_value)
if self.show_units:
_value_str += " " + self.units + " "
if self.suffix:
_value_str += " " + self.suffix + " "
if self.prefix:
_space = ""
if self.pv_ctrl is not None:
if self.pv_ctrl.lowerDisplayLimit < 0:
_space = " "
_value_str = self.prefix + _space + _value_str
return _value_str
def post_display_value(self, value):
_value_str = self.format_display_value(value)
if "setText" in dir(self):
if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
self.blockSignals(True)
self.setText(_value_str)
self.blockSignals(False)
else:
self.setText(_value_str)
else:
print("setText method does not exist for this widget class:\n",
self.widget.__class__)
print("sender was: ", self.sender())
def py_connect_callback(self, handle, pvname, status):
'''Callback function to be invoked on change of pv connection status.
Checks for existence of widget. Waits up to a maximun of 100 ms.
'''
pv_name = pvname
self.trigger_connect.emit(int(handle), str(pv_name), int(status))
def receive_connect_update(self, handle, pv_name, status,
post_display=True):
'''Triggered by connect signal. For Widget to overload.'''
if pv_name is not None:
if pv_name != self.pv_name:
print(("pv_name {0} in receive_connect_update " +
"does not match: {1}").format(pv_name, self.pv_name))
if status == self.cyca.ICAFE_CS_CONN:
self.initialize_connect = True
self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info is not None and self.record_type is None:
if "Not Supported" in self.pv_info.className and self.check_rtyp:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else \
self.pv_info.className
_rtype = self.cafe.close(self.pv_name.split(".")[0] +
".RTYP")
else:
self.record_type = self.pv_info.className
self.set_precision_and_units(reconnectFlag=True)
if not self.msg_label:
_value = self.cafe.getCache(handle, dt='native')
#Another reconnection in progress!!!
if _value is None:
return
else:
_value = self.msg_label
if post_display:
self.post_display_value(_value)
self.qt_property_reconnect()
else:
self.qt_property_disconnect()
if status == self.cyca.ICAFE_CS_CLOSED:
self.initialize_again = True
elif self.initialize_again:
#monitos_id informs whether or not widget has a monitor
#CAQMessageButton for instance does not have a monitor
if not self.pv_within_daq_group and self.monitor_id is not None:
self.monitor_start()
self.initialize_again = False
return
def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
''' DAQ mode is widget specific.
DAQ may be in BS mode, but channels within DAQ stream that
are not BS enabled will be flagged as CA Mode, i.e., CARead
'''
_current_qt_dynamic_property = self.qt_dynamic_property_get()
alarm_severity = daq_pvd.alarmSeverity
self.pvd = daq_pvd
if daq_mode != self.qt_object_name:
self.qt_object_name = daq_mode
self.setObjectName(self.qt_object_name)
self.qt_style_polish()
if daq_state in (self.cyca.ICAFE_DAQ_STOPPED,):
if _current_qt_dynamic_property != self.DAQ_STOPPED:
self.qt_property_daq_stopped()
elif daq_state in (self.cyca.ICAFE_DAQ_PAUSED,):
if _current_qt_dynamic_property != self.DAQ_PAUSED:
self.qt_property_daq_paused()
elif daq_state in (self.cyca.ICAFE_DAQ_RUN,):
if daq_mode == self.PV_DAQ_BS and \
_current_qt_dynamic_property != self.READBACK_STATIC:
self.qt_property_static()
elif daq_mode == self.PV_DAQ_CA:
if self.color_mode != self.color_mode_requested:
self.color_mode = self.color_mode_requested
if self.cafe.isEnum(self.handle) and \
isinstance(daq_pvd.value[0], int):
_value = self.cafe.getStringFromEnum(self.handle,
daq_pvd.value[0])
else:
_value = daq_pvd.value[0]
if daq_pvd.status == self.cyca.ICAFE_NORMAL:
if self.msg_label == "":
self.post_display_value(_value)
if daq_mode == self.PV_DAQ_BS:
return
#Check if color settings are correct
if alarm_severity > self.cyca.SEV_NO_ALARM:
self.color_mode = self.READBACK_ALARM
self.color_mode_requested = self.READBACK_ALARM
if self.color_mode == self.READBACK_ALARM:
if alarm_severity == self.cyca.SEV_MINOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MINOR:
self.qt_property_alarm_sev_minor()
elif alarm_severity == self.cyca.SEV_MAJOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MAJOR:
self.qt_property_alarm_sev_major()
elif alarm_severity == self.cyca.SEV_INVALID:
if _current_qt_dynamic_property != \
self.ALARM_SEV_INVALID:
self.qt_property_alarm_sev_invalid()
elif alarm_severity == self.cyca.SEV_NO_ALARM:
if _current_qt_dynamic_property != \
self.ALARM_SEV_NO_ALARM:
self.qt_property_alarm_sev_no_alarm()
elif _current_qt_dynamic_property != self.READBACK_STATIC:
self.qt_property_static()
else:
if _current_qt_dynamic_property != self.DISCONNECTED:
self.qt_property_disconnect()
def receive_monitor_dbr_time(self, pvdata, alarm_severity):
print("called from gateway", self.pv_name, alarm_severity, flush=True)
pvdata.show()
def receive_monitor_update(self, value, status, alarm_severity):
'''Triggered by monitor signal. For Widget to overload.'''
self.mutex_post_display.lock()
_current_qt_dynamic_property = self.qt_dynamic_property_get()
if status == self.cyca.ICAFE_NORMAL:
if self.msg_label == "":
self.post_display_value(value)
#For DAQ when channel connects after application start-up
if _current_qt_dynamic_property == self.DISCONNECTED:
self.qt_property_initial_values(qt_object_name=self.PV_READBACK)
#Check if color settings are correct
elif _current_qt_dynamic_property == self.READBACK_STATIC:
if alarm_severity > self.cyca.SEV_NO_ALARM:
if alarm_severity < self.cyca.SEV_INVALID:
self.color_mode = self.READBACK_ALARM
self.status_tip = ("Widget color mode is dynamic, " +
"pv with alarm limits")
elif alarm_severity == self.cyca.SEV_INVALID:
if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
self.qt_property_alarm_sev_invalid()
if self.color_mode == self.READBACK_ALARM:
if alarm_severity == self.cyca.SEV_MINOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MINOR:
self.qt_property_alarm_sev_minor()
elif alarm_severity == self.cyca.SEV_MAJOR:
if _current_qt_dynamic_property != self.ALARM_SEV_MAJOR:
self.qt_property_alarm_sev_major()
elif alarm_severity == self.cyca.SEV_INVALID:
if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
self.qt_property_alarm_sev_invalid()
elif alarm_severity == self.cyca.SEV_NO_ALARM:
if _current_qt_dynamic_property != self.ALARM_SEV_NO_ALARM:
self.qt_property_alarm_sev_no_alarm()
else:
if _current_qt_dynamic_property != self.DISCONNECTED:
self.qt_property_disconnect()
self.mutex_post_display.unlock()
def py_monitor_callback(self, handle, pvname, pvdata):
'''Callback function to be invoked on change of pv value.
cafe.getCache and cafe.set operations permitted within callback.
'''
self.mutex.lock()
pv_name = pvname
pvd = pvdata
if not hasattr(self, 'cafe'):
print("py_monitor_callback: name/handle self cafe is NONE",
pv_name, handle)
return
self.pvd = pvd
if pvd.status == self.cyca.ICAFE_CS_NEVER_CONN:
print("initialize again")
self.initialize()
elif pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_alarm_severity = self.cyca.ICAFE_CA_OP_CONN_DOWN
else:
_alarm_severity = pvd.alarmSeverity
if self.monitor_dbr_time:
self.trigger_monitor.emit(pvd, _alarm_severity)
elif isinstance(pvd.value[0], str):
self.trigger_monitor_str.emit((pvd.value[0]), pvd.status,
_alarm_severity)
elif isinstance(pvd.value[0], int):
self.trigger_monitor_int.emit((pvd.value[0]), pvd.status,
_alarm_severity)
else:
self.trigger_monitor_float.emit(float(pvd.value[0]), pvd.status,
_alarm_severity)
self.mutex.unlock()
def monitor_start(self):
'''Initiate monitor on pv.'''
if self.handle > 0:
#Is monitor in waiting - now deleted with monitor_stop
if self.notify_unison:
self.monitor_id = self.cafe.monitorStart(
self.handle, dbr=self.cyca.CY_DBR_TIME)
#start with gateway supplied monitor callback handler
elif self.monitor_callback is None:
self.monitor_id = self.cafe.monitorStart(
self.handle, cb=self.py_monitor_callback,
dbr=self.cyca.CY_DBR_TIME,
notify_milliseconds=self.notify_milliseconds)
else:
self.monitor_id = self.cafe.monitorStart(
self.handle, cb=self.monitor_callback,
dbr=self.cyca.CY_DBR_TIME,
notify_milliseconds=self.notify_milliseconds)
def monitor_stop(self):
if self.handle > 0:
_monID_list = self.cafe.getMonitorIDs(self.handle)
_monID_inwaiting_list = self.cafe.getMonitorIDsInWaiting(
self.handle)
_monID_all = _monID_list + _monID_inwaiting_list
if self.monitor_id in _monID_all:
self.cafe.monitorStop(self.handle, self.monitor_id)
#Is monitor in waiting?
#remove monitors in waiting - to do
def reconnect_channel(self):
self.cafe.reconnect([self.handle]) #list
def set_desc(self):
'''Set description of pv from pv.DESC'''
if self.cafe.hasDescription(self.handle):
self.desc = self.cafe.getDescription(self.handle)
return
elif self.desc is not None:
return
else:
self.cafe.supplementHandle(self.handle)
if self.cafe.hasDescription(self.handle):
self.desc = self.cafe.getDescription(self.handle)
if self.desc is not None:
return
###Back-up solution
_found = str(self.pv_name).find(".")
if _found != -1:
_pv_desc = str(self.pv_name)[0:_found] +".DESC"
else:
_pv_desc = self.pv_name +".DESC"
_handle_desc = self.cafe.getHandleFromPVName(_pv_desc)
_handle_desc_already_open = False
if _handle_desc == 0:
self.cafe.openPrepare()
_handle_desc = self.cafe.open(_pv_desc)
self.cafe.openNowAndWait(self.timeout, _handle_desc)
time.sleep(0.001)
else:
_handle_desc_already_open = True
if self.cafe.isConnected(_handle_desc):
self.desc = self.cafe.getCache(_handle_desc, 'str')
if self.desc is None:
self.desc = self.cafe.get(_handle_desc, 'str')
else:
self.desc = None
if not _handle_desc_already_open:
self.cafe.close(_handle_desc)
def set_precision_and_units(self, reconnectFlag: bool = False):
'''Set the pv precision and units.'''
if self.pv_ctrl is None or reconnectFlag is True:
self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
if self.pv_ctrl is not None:
if not self.has_precision_user:
self.precision = self.pv_ctrl.precision
if self.pv_ctrl.units is not None:
self.units = str(self.pv_ctrl.units)
else:
self.units = ""
if reconnectFlag is True:
#verify user input
if self.show_units is True and self.suffix is not None:
if self.suffix == self.units:
self.show_units = False
def _qt_readback_color_mode(self):
'''Color mode is determined from CAFE and depends on whether the pv:
has alarm limits (self.color_mode = 'readbackAlarm')
or is without alarm limits (self.color_mode = 'readbackStatic')
'''
#Already set by user
if self.color_mode is self.READBACK_ALARM:
return
if self.cafe.isConnected(self.handle):
pvd = self.cafe.getPVCache(self.handle)
if pvd.alarmSeverity in (self.cyca.SEV_MINOR, self.cyca.SEV_MAJOR) \
or self.cafe.hasAlarmStatusSeverity(self.handle):
self.color_mode = self.READBACK_ALARM
self.status_tip = ("Widget color mode is dynamic, " +
"pv with alarm limits")
else:
self.color_mode = self.READBACK_STATIC
self.status_tip = ("Widget color mode is static, " +
"pv without alarm limits")
def qt_property_initial_values(self, qt_object_name: str = None,
tool_tip: bool = True):
'''Set Qt property values.'''
self.qt_object_name = qt_object_name
if tool_tip:
self.setToolTip(self.pv_name)
self.setObjectName(self.qt_object_name)
if self.qt_object_name in self.qt_object_to_property.keys():
self._qt_property_selected = copy.deepcopy(
self.qt_object_to_property[self.qt_object_name])
else:
print("qt_property_initial_values: Object not found in dictionary")
if self.cafe.isConnected(self.handle):
if self.qt_object_name == self.PV_READBACK:
self._qt_readback_color_mode()
#self.setStatusTip(self.status_tip)
elif self.qt_object_name == self.PV_CONTROLLER:
if self.color_mode == self.ACT_ON_BEAM:
#self.setStatusTip("PV setting acts directly on beam")
pass
else:
self.color_mode = self.NOT_ACT_ON_BEAM
#self.setStatusTip("PV setting does not influence beam")
elif self.qt_object_name == self.PV_DAQ_CA:
self._qt_readback_color_mode()
elif self.qt_object_name == self.PV_DAQ_BS:
self.color_mode = self.READBACK_STATIC
#if 'READ' in self.pv_name:
# print('color mode',self.pv_name, self.color_mode)
self._qt_dynamic_property_set(self.color_mode)
else:
self.qt_property_disconnect()
def qt_dynamic_property_get(self, property_state: str = None):
'''Retrieves the requested property value
else that which is currently true'''
for _property, _value in self._qt_property_selected.items():
if property_state is not None:
if _property == property_state:
return _value
elif _value:
return _property
def _qt_dynamic_property_set(self, property_state: str = None):
'''
Set the Input property to true, and the remainder to False
If None is given then all dynamic properties are set to False
'''
for _property in self._qt_property_selected.keys():
if _property == property_state:
self.setProperty(_property, True)
self._qt_property_selected[_property] = True
else:
self.setProperty(_property, False)
self._qt_property_selected[_property] = False
def qt_property_disconnect(self):
'''Set Qt disconnect property value.'''
self._qt_dynamic_property_set(self.DISCONNECTED)
self.qt_style_polish()
def qt_property_reconnect(self):
'''Set Qt connected property value.'''
if self.qt_object_name == self.PV_READBACK:
self._qt_readback_color_mode()
#self.setStatusTip(self.status_tip)
elif self.qt_object_name == self.PV_CONTROLLER:
if self.color_mode == self.ACT_ON_BEAM:
#self.setStatusTip("PV setting acts directly on beam")
pass
else:
self.color_mode = self.NOT_ACT_ON_BEAM
#self.setStatusTip("PV setting does not influence beam")
#self._qt_property_selected =
self._qt_dynamic_property_set(self.color_mode)
self.qt_style_polish()
def qt_property_alarm_sev_major(self):
'''Set Qt MAJOR property value.'''
self._qt_dynamic_property_set(self.ALARM_SEV_MAJOR)
self.setStatusTip("{0} reports value in MAJOR alarm state!".format(
self.pv_name))
self.qt_style_polish()
def qt_property_alarm_sev_minor(self):
'''Set Qt MINOR property value.'''
self._qt_dynamic_property_set(self.ALARM_SEV_MINOR)
self.setStatusTip("{0} reports value in MINOR alarm state!".format(
self.pv_name))
self.qt_style_polish()
def qt_property_alarm_sev_no_alarm(self):
'''Set Qt READBACK_ALARM property value.'''
#self._qt_property_selected =
self._qt_dynamic_property_set(self.READBACK_ALARM)
self.setStatusTip("{0} reports value in normal state".format(
self.pv_name))
self.qt_style_polish()
def qt_property_alarm_sev_invalid(self):
'''Set Qt INVALID property value.'''
self._qt_dynamic_property_set(self.ALARM_SEV_INVALID)
self.setStatusTip("PV={0} reports an INVALID value!".format(
self.pv_name))
self.qt_style_polish()
def qt_property_static(self):
'''Set Qt STATIC property value.'''
self._qt_dynamic_property_set(self.READBACK_STATIC)
self.setStatusTip("PV={0} does not have an alarm state".format(
self.pv_name))
self.qt_style_polish()
def qt_property_daq_stopped(self):
'''Set Qt STOPPED property value.'''
self._qt_dynamic_property_set(self.DAQ_STOPPED)
self.setStatusTip("PV={0} reports DAQ has stopped".format(
self.pv_name))
self.qt_style_polish()
def qt_property_daq_paused(self):
'''Set Qt STOPPED property value.'''
self._qt_dynamic_property_set(self.DAQ_PAUSED)
self.setStatusTip("PV={0} reports DAQ has paused".format(
self.pv_name))
self.qt_style_polish()
def qt_style_polish(self, redraw=False):
if redraw:
self.style().unpolish(self)
self.style().polish(self)
event = QEvent(QEvent.StyleChange)
QApplication.sendEvent(self, event)
self.update()
self.updateGeometry()
else:
self.style().polish(self)
QApplication.processEvents()
def pv_status_text_header(self, source="Channel Access"):
_source = source
_source_separator = "----------------------------------------"
_text = """
<p style = 'text-align: left';>
Widget: {0} ({1}, {2}) <br>
""".format(self.widget_class, self.qt_object_name, self.color_mode)
if self.msg_press_value is not None:
_text += """
<font color={1}>On press, sends value: {0} </font> <br>
""".format(self.msg_press_value, "DarkOrchid")
if self.msg_release_value is not None:
_text += """
<font color={1}>On release, sends value: {0} </font> <br>
""".format(self.msg_release_value, "DarkOrchid")
if self.pv_within_daq_group:
if self.qt_object_name in self.PV_DAQ_BS:
_ds_color = "Navy Blue"
else:
_ds_color = "Black"
else:
_ds_color = "Black"
_text += """
{0} <br> <font color={4}>
Data source: {1} </font><br>
{0} <br>
PV: <font color={3}>{2}</font>
""".format(_source_separator, _source, self.pv_name, "DarkOrchid",
_ds_color)
if self.desc is None:
self.set_desc()
if self.desc == "":
_text += """</p>
"""
return _text
_text += """
<br>
Description:<font color={1}> {0} </font>
</p>
""".format(self.desc, "DarkOrchid")
return _text
def pv_status_text_enum(self):
_val_enum = None
_value = self.pvd.value[0]
if isinstance(_value, str):
_val_enum = self.cafe.getEnumFromString(self.handle, _value)
elif _value is not None:
_val_enum = self.cafe.getStringFromEnum(self.handle, _value)
_color = "Blue"
#To catch case where channel is called by user
#To catch DAQ case
if self.pv_within_daq_group:
if self.qt_object_name in self.PV_DAQ_BS:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_color = "White"
elif self.qt_object_name in self.PV_DAQ_CA:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_color = "White"
elif not self.cafe.isConnected(self.handle):
_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_color = "White"
_text = """
<p style = 'text-align: left';>
Value: <font color={0}>{1} [{2}]</font> <br>
""".format(_color, _value, _val_enum)
return _text
def pv_status_text_data(self):
_value_str = ""
_first_end = 9
_end_range = min(self.pvd.nelem, _first_end)
if _end_range > 1:
_value_str = "[ "
for i in range(0, _end_range):
_value = self.pvd.value[i]
if _value is None:
_value = '0'
if isinstance(_value, str):
_value_str += _value
elif isinstance(_value, int):
_value_str += str(_value)
else:
if self.pv_ctrl is not None:
_value_form = ("{:<.%sf}" % self.pv_ctrl.precision)
_value_str += _value_form.format(
round(_value, self.pv_ctrl.precision))
if i < (_end_range-1):
_value_str += " "
if self.pvd.nelem > _first_end:
_value_str += " ... "
_value = self.pvd.value[self.pvd.nelem-1]
if isinstance(_value, str):
_value_str += _value
elif isinstance(_value, int):
_value_str += str(_value)
else:
if self.pv_ctrl is not None:
_value_form = ("{:<.%sf}" % self.pv_ctrl.precision)
_value_str += _value_form.format(
round(_value, self.pv_ctrl.precision))
_value_str += " "
if _end_range > 1:
_value_str += "]"
_color = "Blue"
#To catch DAQ case
if self.pv_within_daq_group:
if self.qt_object_name in self.PV_DAQ_BS:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_color = "White"
elif self.qt_object_name in self.PV_DAQ_CA:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_color = "White"
elif not self.cafe.isConnected(self.handle):
_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_color = "White"
_text = """
<p style = 'text-align: left';>
Value: <font color={0}>{1} {2}</font> <br>
""".format(_color, _value_str, self.units)
return _text
def pv_status_text_timestamp(self):
_status_not_ok_color = "IndianRed"
_status_ok_color = "DimGray"
_ts_color = "Blue"
_color = _status_ok_color
#To catch DAQ case
if self.pv_within_daq_group:
if self.qt_object_name in self.PV_DAQ_BS:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_ts_color = "White"
_color = "White"
elif self.qt_object_name in self.PV_DAQ_CA:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_ts_color = "White"
_color = "White"
elif not self.cafe.isConnected(self.handle):
_ts_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_ts_color = "White"
if self.pvd.status != self.cyca.ICAFE_NORMAL:
_color = _status_not_ok_color
_text = """
Timestamp: <font color={0}>{2}</font> <br>
Status: <font color={1}>{3} <br> {4} </font>
""".format(_ts_color, _color, self.pvd.tsDateAsString,
self.pvd.statusAsString,
self.cafe.getStatusInfo(self.pvd.status))
return _text
def pv_status_text_alarm(self):
_text = """
"""
_color = "DimGray"
#To catch DAQ case
if self.pv_within_daq_group:
if self.pvd.alarmSeverity == self.cyca.SEV_MINOR:
_color = "Yellow"
elif self.pvd.alarmSeverity == self.cyca.SEV_MAJOR:
_color = "Red"
elif self.pvd.alarmSeverity == self.cyca.SEV_INVALID:
_color = "White"
if self.qt_object_name in self.PV_DAQ_BS:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DAQ_PAUSED,
self.DISCONNECTED):
_color = "White"
elif self.qt_object_name in self.PV_DAQ_CA:
if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
self.DISCONNECTED):
_color = "White"
elif not self.cafe.isConnected(self.handle):
_color = "White"
elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
_color = "White"
elif self.pvd.alarmSeverity == self.cyca.SEV_MINOR:
_color = "Yellow"
elif self.pvd.alarmSeverity == self.cyca.SEV_MAJOR:
_color = "Red"
elif self.pvd.alarmSeverity == self.cyca.SEV_INVALID:
_color = "White"
_text += """ <br>
Alarm status: <font color={0}>{1}</font> <br>
Alarm severity: <font color={0}>{2}</font>
""".format(_color, self.pvd.alarmStatusAsString,
self.pvd.alarmSeverityAsString)
return _text
def pv_access(self):
_accessIs = ""
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info.accessRead:
_accessIs += "Read"
if self.pv_info.accessWrite:
_accessIs += "Write"
return _accessIs
def pv_status_text_enum_metadata(self):
_text = """ </p> <p>
ENUM strings: <font color={1}>{2}</font> <br><br>
Data type (native):<font color={0}> {3} </font><br>
Record type: <font color={0}> {4} </font><br>
RW Access:<font color={0}> {5} </font><br>
IOC: <font color={0}> {6} </font></p>
""".format("MediumBlue", "DarkOrchid", self.pvc.enumStrings,
self.pv_info.dataTypeAsString,
self.record_type, self.pv_access(),
self.pv_info.hostName)
return _text
def pv_status_text_metadata(self):
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info is not None and self.record_type is None:
if "Not Supported" in self.pv_info.className and self.check_rtyp:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else \
self.pv_info.className
self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
else:
self.record_type = self.pv_info.className
if self.record_type in ["stringin", "stringout"]:
_text = """ </p> <p>
Data type (native):<font color={0}> {1} </font><br>
Record type: <font color={0}> {2} </font><br>
RW Access:<font color={0}> {3} </font><br>
IOC: <font color={0}> {4} </font></p>
""".format("MediumBlue", self.pv_info.dataTypeAsString,
self.record_type, self.pv_access(),
self.pv_info.hostName)
return _text
_text = """ </p> <p>
"""
if self.pvd.nelem > 1:
_text += """
Nelem: <font color={0}> {1} </font><br>
""".format("MediumBlue", self.pvd.nelem)
_text += """
Precision (PV): <font color={0}>{1}</font> <br>
Data type (native):<font color={0}> {2} </font><br>
Record type: <font color={0}> {3} </font><br>
RW Access:<font color={0}> {4} </font><br>
IOC: <font color={0}> {5} </font></p>
""".format("MediumBlue", self.pvc.precision,
self.pv_info.dataTypeAsString,
self.record_type, self.pv_access(),
self.pv_info.hostName)
return _text
def pv_status_text_alarm_limits(self):
if self.pv_info is None:
self.pv_info = self.cafe.getChannelInfo(self.handle)
if self.pv_info is not None and self.record_type is None:
if "Not Supported" in self.pv_info.className and self.check_rtyp:
_rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
self.record_type = _rtype if _rtype is not None else \
self.pv_info.className
self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
else:
self.record_type = self.pv_info.className
_text = """
"""
#No all record types have alarms
#className is not supported at psi since introduction of the
#linux ca gateway
#Not Supported by Gateway
if "Not Supported" in str(self.record_type):
pass
elif self.record_type not in self._alarm_severity_record_types:
return _text
if self.pvc.lowerAlarmLimit == 0 and self.pvc.upperAlarmLimit == 0 and \
self.pvc.lowerWarningLimit == 0 and self.pvc.upperWarningLimit == 0:
return _text
if self.cafe.hasAlarmStatusSeverity(self.handle):
_text = """ <p>
Lower/Upper <em>alarm</em> limit:&nbsp;&nbsp;&nbsp;
<font color={0}>{1} &nbsp;/&nbsp; {4}</font> <br>
Lower/Upper warning limit:
<font color={0}>{2} &nbsp;/&nbsp; {3} </font>
</p>
""".format("MediumBlue",
self.pvc.lowerAlarmLimit, self.pvc.lowerWarningLimit,
self.pvc.upperWarningLimit, self.pvc.upperAlarmLimit)
return _text
def pv_status_text_display_limits(self):
_text = """
"""
if self.pvc.lowerDisplayLimit == 0 and \
self.pvc.upperDisplayLimit == 0 and \
self.pvc.lowerControlLimit == 0 and self.pvc.upperControlLimit == 0:
return _text
_text = """ <p>
Lower/Upper <em>control</em> limit:
<font color={0}>{3} &nbsp;/&nbsp; {4} </font> <br>
Lower/Upper display limit:
<font color={0}> {1} &nbsp;/&nbsp; {2} </font>
</p>
""".format("MediumBlue",
self.pvc.lowerDisplayLimit, self.pvc.upperDisplayLimit,
self.pvc.lowerControlLimit, self.pvc.upperControlLimit)
return _text
def pv_status_text(self):
'''pv metadata to accompany widget's dialog box.'''
QApplication.processEvents()
_source = "Channel Access"
if self.pv_within_daq_group:
if self.qt_object_name == self.PV_DAQ_BS:
_source = "DAQ (Beam Synchronous)"
#self.pvd written to in receive_daq_update
elif self.qt_object_name == self.PV_DAQ_CA:
_source = "DAQ (Channel Access)"
self.pvd = self.cafe.getPVCache(self.handle)
if self.pvd.pulseID > 0:
_source += "<br>Pulse ID: {0}".format(self.pvd.pulseID)
else:
self.pvd = self.cafe.getPVCache(self.handle)
self.pvc = self.cafe.getCtrlCache(self.handle)
_text_data = """
"""
if self.pvd.status == self.cyca.ECAFE_INVALID_HANDLE:
_text_data = """<p> Status: <font color={0}>{1} <br> {2}</font> </p>
""".format("Blue",
"Channel closed while DAQ in STOP state.",
("PV info requires DAQ to be in " +
"RUN/PAUSED state"))
elif self.pvd.status == self.cyca.ICAFE_CS_NEVER_CONN:
_text_data = """<p> Status: <font color={0}>{1} <br> {2}</font> </p>
""".format("Red", self.pvd.statusAsString,
self.cafe.getStatusInfo(self.pvd.status))
elif self.pvc.noEnumStrings > 0:
_text_data = (self.pv_status_text_enum() +
self.pv_status_text_timestamp() +
self.pv_status_text_alarm() +
self.pv_status_text_enum_metadata())
else:
_text_data = (self.pv_status_text_data() +
self.pv_status_text_timestamp() +
self.pv_status_text_alarm() +
self.pv_status_text_metadata() +
self.pv_status_text_alarm_limits() +
self.pv_status_text_display_limits())
self.pv_message_in_a_box.setText(
self.pv_status_text_header(source=_source) + _text_data)
QApplication.processEvents()
self.pv_message_in_a_box.exec()
def lookup_archiver(self):
'''Plot pvdata from archiver.'''
#"https://ui-data-api.psi.ch/prepare?
#channel=sf-archiverappliance/"
urlIs = self.url_archiver
urlIs = urlIs + self.pv_name
if not QDesktopServices.openUrl(QUrl(urlIs)):
print("URL FOR ARCHIVER NOT FOUND", urlIs)
#if self.show_log_message is not None:
# self.show_log_message(MsgSeverity.ERROR, __pymodule__, _line(),
# "Failed to open URL {0}".format(urlIs))
def lookup_databuffer(self):
'''Plot beam synchronous pvdata from databuffer.'''
#""https://ui-data-api.psi.ch/prepare?channel = sf-databuffer/"
urlIs = self.url_databuffer
urlIs = urlIs + self.pv_name
if not QDesktopServices.openUrl(QUrl(urlIs)):
print("URL FOR DATA BUFFER NOT FOUND", urlIs)
#if self.show_log_message is not None:
# self.show_log_message(MsgSeverity.ERROR, __pymodule__, _line(),
# "Failed to open URL {0}".format(urlIs))
QApplication.processEvents()
def strip_chart(self):
'''PShell strip chart.'''
configStr = ("-config = [[[true,\"" + self.pv_name +
"\",\"Channel\",1,1]]]")
commandStr = "/sf/op/bin/strip_chart"
argStr = ["-nlaf", "-start", configStr, "&"]
QProcess.startDetached(commandStr, argStr)
def display_parameters(self):
display_wgt = QDialog(self)
_rect = display_wgt.geometry() #
_parentRect = self.context_menu.geometry()
_rect.moveTo(display_wgt.mapToGlobal(
QPoint(_parentRect.x() + _parentRect.width() - _rect.width(),
_parentRect.y())))
display_wgt.setGeometry(_rect)
display_wgt.setWindowTitle(self.pv_name)
layout = QVBoxLayout()
precision_flag = True
if self.pv_ctrl is not None:
if self.pv_ctrl.precision <= 0:
precision_flag = False
if self.cafe.getDataTypeNative(self.handle) in (
self.cyca.CY_DBR_FLOAT,
self.cyca.CY_DBR_DOUBLE) and precision_flag:
#precision user
_hbox_wgt = QWidget()
_hbox = QHBoxLayout()
precision_user_label = QLabel("Precision (user):")
self.precision_user_wgt = QSpinBox(self)
self.precision_user_wgt.setFocusPolicy(Qt.NoFocus)
self.precision_user_wgt.setValue(int(self.precision))
if self.pv_ctrl is not None:
_max = self.pv_ctrl.precision
else:
_max = 6
self.precision_user_wgt.setMaximum(_max)
self.precision_user_wgt.valueChanged.connect(
self.precision_user_changed)
_hbox.addWidget(precision_user_label)
_hbox.addWidget(self.precision_user_wgt)
_hbox_wgt.setLayout(_hbox)
precision_user_label.setFixedWidth(110)
self.precision_user_wgt.setFixedWidth(35)
_hbox_wgt.setFixedWidth(160)
#precision ioc
_hbox2_wgt = QWidget()
_hbox2 = QHBoxLayout()
precision_ioc_label = QLabel("Precision (ioc): ")
precision_ioc = QPushButton(self)
precision_ioc.setText(" {} ".format(_max))
precision_ioc.clicked.connect(self.precision_ioc_reset)
_hbox2.addWidget(precision_ioc_label)
_hbox2.addWidget(precision_ioc)
_hbox2_wgt.setLayout(_hbox2)
precision_ioc_label.setFixedWidth(110)
precision_ioc.setFixedWidth(20)
_hbox2_wgt.setFixedWidth(145)
layout.addWidget(_hbox_wgt)
layout.addWidget(_hbox2_wgt)
#precision refresh rate
_hbox3_wgt = QWidget()
_hbox3 = QHBoxLayout()
refresh_freq_label = QLabel("Refresh rate: ")
_default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \
self.notify_freq_hz_default
self.refresh_freq_combox_idx_dict = {0: 0, 1: 10, 2: 5, 3: 2, 4: 1,
5: 0.5, 6: _default_refresh_val}
refresh_freq = QComboBox(self)
refresh_freq.addItem('direct')
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[1]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[2]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[3]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[4]))
refresh_freq.addItem('{0} Hz'.format(
self.refresh_freq_combox_idx_dict[5]))
_default_text = 'default (direct)' if _default_refresh_val == 0 else \
'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6])
refresh_freq.addItem(_default_text)
for key, value in self.refresh_freq_combox_idx_dict.items():
if value == self.notify_freq_hz:
refresh_freq.setCurrentIndex(key)
break
refresh_freq.currentIndexChanged.connect(self.refresh_rate_changed)
_hbox3.addWidget(refresh_freq_label)
_hbox3.addWidget(refresh_freq)
_hbox3_wgt.setLayout(_hbox3)
refresh_freq_label.setFixedWidth(110)
refresh_freq.setFixedWidth(115)
_hbox3_wgt.setFixedWidth(235)
layout.addWidget(_hbox3_wgt)
layout.setAlignment(Qt.AlignLeft)
layout.setContentsMargins(10, 0, 0, 0)
layout.setSpacing(0)
display_wgt.setMinimumWidth(340)
display_wgt.setLayout(layout)
display_wgt.exec()
QApplication.processEvents()
def precision_ioc_reset(self):
if self.pv_ctrl is not None:
self.precision_user = self.pv_ctrl.precision
self.precision = self.pv_ctrl.precision
if self.precision is not None:
self.precision_user_wgt.setValue(self.precision)
def precision_user_changed(self, new_value):
self.precision_user = new_value
self.precision = new_value
_pvd = self.cafe.getPVCache(self.handle)
if _pvd.value[0] is not None:
if isinstance(_pvd.value[0], float):
self.trigger_monitor_float.emit(
_pvd.value[0], _pvd.status, _pvd.alarmSeverity)
def refresh_rate_changed(self, new_idx):
_notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx]
self.notify_milliseconds = 0 if _notify_freq_hz == 0 else \
1000 / _notify_freq_hz
self.notify_freq_hz = _notify_freq_hz
if self.notify_unison:
self.notify_unison = False
self.monitor_stop()
self.monitor_start()
else:
self.cafe.updateMonitorPolicyDeltaMS(
self.handle, self.monitor_id, self.notify_milliseconds)
#https://doc.qt.io/qt-5.9/qtwidgets-mainwindows-menus-example.html
#Since Qt5 this has to be implemented in order to avoid the Select
#All dialogue button appearing..
def contextMenuEvent(self, event):
return
def showContextMenu(self):
self.context_menu.exec(QCursor.pos())
def mousePressEvent(self, event):
'''Action on mouse press event.'''
button = event.button()
if button == Qt.RightButton:
self.context_menu.exec(QCursor.pos())
self.clearFocus()
def mouseReleaseEvent(self, event):
event.ignore()