diff --git a/pvgateway.py- b/pvgateway.py-
deleted file mode 100644
index 2d6d64d..0000000
--- a/pvgateway.py-
+++ /dev/null
@@ -1,1690 +0,0 @@
-"""
-The module provides data and metadata of a process variable through
-PyCafe.
-"""
-__author__ = 'Jan T. M. Chrin'
-
-import copy
-from enum import IntEnum
-import inspect
-import time
-
-from distutils.version import LooseVersion
-
-from qtpy.QtCore import (QEvent, QMutex, QPoint, QProcess, QSettings, Qt, QUrl,
- Signal)
-from qtpy.QtCore import __version__ as QT_VERSION_STR
-from qtpy.QtGui import QCursor, QDesktopServices, QFont
-from qtpy.QtWidgets import (QAction, QApplication, QComboBox, QDialog,
- QHBoxLayout, QLabel, QMenu, QMessageBox,
- QPushButton, QSpinBox, QVBoxLayout, QWidget)
-
-def __LINE__():
- return inspect.currentframe().f_back_f_lineno
-
-class DAQState(IntEnum):
- BS = 10
- CA = 20
- BS_STOP = 30
- CA_STOP = 40
- BS_PAUSE = 50
- CA_PAUSE = 60
-
-class PVGateway(QWidget):
- """Retrieves pv metadata through PyCafe.
-
- The PVGateway class when subclassed by Qt widgets enables their
- connectivity to channel access.
-
- Attributes:
- monid: (int) Monitor id
- units : (str) Units associated with the pv
-
- trigger_monitor_ is the signal triggered by updates
- arising from monitored pvs.
- trigger_connect is the signal triggered from changes in pv
- connection status.
- widget_handle_dict is a dictionary mapping widgets to their pv
- handle.
- A pv handle may be associated to more than one widget.
- """
- trigger_monitor_float = Signal(float, int, int)
- trigger_monitor_int = Signal(int, int, int)
- trigger_monitor_str = Signal(str, int, int)
- trigger_monitor = Signal(object, int) #pvdata, status
-
-
- trigger_connect = Signal(int, str, int)
-
- trigger_daq = Signal(object, str, int)
- trigger_daq_int = Signal(object, str, int)
- trigger_daq_str = Signal(object, str, int)
-
- #Properties, user supplied
- ACT_ON_BEAM = 'actOnBeam'
- NOT_ACT_ON_BEAM = 'notActOnBeam'
- READBACK_ALARM = 'alarm'
- READBACK_STATIC = 'static'
-
- #Properties, dynamic
- DISCONNECTED = 'disconnected'
- ALARM_SEV_MINOR = 'alarmSevMinor'
- ALARM_SEV_MAJOR = 'alarmSevMajor'
- ALARM_SEV_INVALID = 'alarmSevInvalid'
- ALARM_SEV_NO_ALARM = READBACK_ALARM
- DAQ_STOPPED = 'stopped'
- DAQ_PAUSED = 'paused'
-
- #ObjectName, defined by CAQ
- PV_CONTROLLER = "Controller"
- PV_READBACK = "Readback"
- PV_DAQ_BS = "BSRead"
- PV_DAQ_CA = "CARead"
-
- _DAQ_CAFE_SG_NAME = "gBS2CA"
-
- _alarm_severity_record_types = ["ai", "ao", "calc", "calcout", "dfanout",
- "longin", "longout", "pid", "sel",
- "steppermotor", "sub"]
-
- #parent is Gui
- def __init__(self, parent=None, pv_name: str = "", monitor_callback=None,
- pv_within_daq_group: bool = False, color_mode=None,
- show_units: bool = False, prefix: str = "", suffix: str = "",
- connect_callback=None, msg_label: str = "",
- connect_triggers: bool = True, notify_freq_hz: int = 0,
- notify_unison: bool = False, precision: int = 0,
- monitor_dbr_time: bool = False):
-
- super().__init__()
-
- if parent is None:
- return
-
- if not pv_name:
- return
-
- self.connect_callback = connect_callback
- self.notify_freq_hz = abs(notify_freq_hz)
- self.notify_freq_hz_default = self.notify_freq_hz
-
- self.notify_milliseconds = 0 if self.notify_freq_hz == 0 else \
- 1000 / self.notify_freq_hz
-
- self.notify_unison = bool(notify_unison) and bool(self.notify_freq_hz)
-
- self.parent = parent
- self.settings = self.parent.settings
-
- self.pv_name = pv_name
-
- self.color_mode = None
-
- if color_mode is not None:
- if color_mode in (self.ACT_ON_BEAM,
- self.NOT_ACT_ON_BEAM,
- self.READBACK_ALARM,
- self.READBACK_STATIC):
- self.color_mode = color_mode
-
- self.color_mode_requested = self.color_mode
-
- if monitor_callback is not None:
- self.monitor_callback = monitor_callback
- else:
- self.monitor_callback = None
-
- self.pv_within_daq_group = pv_within_daq_group
-
- self.show_units = show_units
- self.prefix = prefix
- self.suffix = suffix
-
- self.cafe = self.parent.cafe
- self.cyca = self.parent.cyca
-
- if self.parent.settings is not None:
- self.url_archiver = self.parent.settings.data["url"]["archiver"]
- self.url_databuffer = self.parent.settings.data["url"]["databuffer"]
- self.bg_readback = self.parent.settings.data["StyleGuide"][
- "bgReadback"]
- self.fg_alarm_major = self.parent.settings.data["StyleGuide"][
- "fgAlarmMajor"]
- self.fg_alarm_minor = self.parent.settings.data["StyleGuide"][
- "fgAlarmMinor"]
- self.fg_alarm_invalid = self.parent.settings.data["StyleGuide"][
- "fgAlarmInvalid"]
- self.fg_alarm_noalarm = self.parent.settings.data["StyleGuide"][
- "fgAlarmNoAlarm"]
- else:
- #self.settings = ReadJSON(self.parent.appname)
- self.url_archiver = ("https://ui-data-api.psi.ch/prepare?channel=" +
- "sf-archiverappliance/")
- self.url_databuffer \
- = "https://ui-data-api.psi.ch/prepare?channel=sf-databuffer/"
-
- self.daq_group_name = self._DAQ_CAFE_SG_NAME
- self.desc = None
- self.handle = None
- self.initialize_complete = False
- self.initialize_again = False
-
- self.msg_label = msg_label
- self.msg_press_value = None
- self.msg_release_value = None
-
- self.monitor_id = None
- self.monitor_dbr_time = monitor_dbr_time
- self.mutex_post_display = QMutex()
-
- self.precision_user = precision
- self.has_precision_user = bool(precision)
- self.precision_pv = 3
-
- self.precision = (self.precision_user if self.has_precision_user else
- self.precision_pv)
-
- self.pvd = None
- self.pv_ctrl = None
- self.pv_info = None
- self.record_type = None
-
- #if 'show_log_message' in dir(self.parent):
- # self.show_log_message = self.parent.show_log_message
- #else:
- # self.show_log_message = None
-
- self.qt_object_name = None
-
- self.qt_property_controller = {
- self.DISCONNECTED: False,
- self.ACT_ON_BEAM: False, self.NOT_ACT_ON_BEAM: False
- }
-
- self.qt_property_readback = {
- self.DISCONNECTED: False,
- self.READBACK_ALARM: False, self.READBACK_STATIC: False,
- self.ALARM_SEV_MINOR: False, self.ALARM_SEV_MAJOR: False,
- self.ALARM_SEV_INVALID: False
- }
-
- self.qt_property_daq_bs = {
- self.DISCONNECTED: False,
- self.READBACK_ALARM: False, self.READBACK_STATIC: False,
- self.ALARM_SEV_MINOR: False, self.ALARM_SEV_MAJOR: False,
- self.ALARM_SEV_INVALID: False,
- self.DAQ_STOPPED: False, self.DAQ_PAUSED: False
- }
-
- self.qt_property_daq_ca = {
- self.DISCONNECTED: False,
- self.READBACK_ALARM: False, self.READBACK_STATIC: False,
- self.ALARM_SEV_MINOR: False, self.ALARM_SEV_MAJOR: False,
- self.ALARM_SEV_INVALID: False,
- self.DAQ_STOPPED: False, self.DAQ_PAUSED: False
- }
-
- self.qt_object_to_property = {
- self.PV_CONTROLLER: self.qt_property_controller,
- self.PV_READBACK: self.qt_property_readback,
- self.PV_DAQ_BS: self.qt_property_daq_bs,
- self.PV_DAQ_CA: self.qt_property_daq_ca
- }
-
- self._qt_property_selected = {}
-
- self.status_tip = None
- self.suggested_text = ""
- self.time_monotonic = time.monotonic()
- self.pvd_previous = None
- self.timeout = 0.2
- self.units = ""
-
- self.widget = self
-
- _widget_name_part = str(self.widget.__class__).split("\'")[1].split(".")
- #_widget_class_part = _widget_name_part[1].split(".")
- self.widget_class = _widget_name_part[len(_widget_name_part)-1]
-
- if pv_within_daq_group:
- self.trigger_daq_int.connect(self.receive_daq_update)
- self.trigger_daq.connect(self.receive_daq_update)
- self.trigger_daq_str.connect(self.receive_daq_update)
-
- elif connect_triggers:
- self.trigger_monitor.connect(self.receive_monitor_dbr_time)
- self.trigger_monitor_str.connect(self.receive_monitor_update)
- self.trigger_monitor_int.connect(self.receive_monitor_update)
- self.trigger_monitor_float.connect(self.receive_monitor_update)
- self.trigger_connect.connect(self.receive_connect_update)
-
- self.context_menu = QMenu()
- self.context_menu.setObjectName("contextMenu")
- self.context_menu.setWindowModality(Qt.NonModal) #ApplicationModal
- if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.0"):
- self.context_menu.addSection("PV: {0}".format(self.pv_name))
-
- action1 = QAction("Text Info", self)
- action1.triggered.connect(self.pv_status_text)
-
- action2 = QAction("Lookup in Archiver", self)
- action2.triggered.connect(self.lookup_archiver)
-
- action3 = QAction("Lookup in Databuffer", self)
- action3.triggered.connect(self.lookup_databuffer)
-
- action4 = QAction("Strip Chart (PShell)", self)
- action4.triggered.connect(self.strip_chart)
-
- action6 = QAction("Configure Display Parameters", self)
- action6.triggered.connect(self.display_parameters)
-
- self.context_menu.addAction(action1)
- self.context_menu.addAction(action2)
- self.context_menu.addAction(action3)
- self.context_menu.addAction(action4)
-
- action5 = QAction("Reconnect: {0}".format(self.pv_name), self)
- action5.triggered.connect(self.reconnect_channel)
- _font = QFont()
- _font.setPixelSize(12)
- action5.setFont(_font)
-
- if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
- self.context_menu.addSection("")
-
- #return action6 and 5 code here eventually
- self.context_menu.addAction(action6)
- self.context_menu.addAction(action5)
-
- self.pv_message_in_a_box = QMessageBox()
- self.pv_message_in_a_box.setObjectName("pvinfo")
-
- #Qt.ApplicationModal not used as it blocks input to all windows
- self.pv_message_in_a_box.setWindowModality(Qt.NonModal)
- self.pv_message_in_a_box.setIcon(QMessageBox.Information)
- self.pv_message_in_a_box.setStandardButtons(QMessageBox.Close)
- self.pv_message_in_a_box.setDefaultButton(QMessageBox.Close)
-
- self.initialize()
-
- #return self - previously used by pvgateway
-
-
- def initialize(self):
- '''Initialze class attributes and connect to ca if required.'''
-
- _handle_within_group_flag = False
- if self.pv_within_daq_group:
- self.handle = self.cafe.getHandleFromPVWithinGroup(
- self.pv_name, self.daq_group_name)
- if self.handle > 0:
- self.cafe.addWidget(self.handle, self.widget)
- _handle_within_group_flag = True
- #Callback already invoked to emit signal here!!
- _channel_info = self.cafe.getChannelInfo(self.handle)
-
- #wgts = self.cafe.getWidgets(self.handle)
-
- self.trigger_connect.emit(
- int(self.handle), str(self.pv_name),
- int(_channel_info.cafeConnectionState))
- #In case user is misinformed
- if not _handle_within_group_flag:
- self.handle = self.cafe.getHandleFromPV(self.pv_name)
- if self.connect_callback is None:
- self.connect_callback = self.py_connect_callback
-
- if self.handle > 0:
- #The second time round, widget is gateway rather than parent,
- #Why is that?
- self.cafe.setPyConnectCallbackFn(self.handle,
- self.connect_callback)
-
- self.cafe.addWidget(self.handle, self.widget)
-
- _channel_info = self.cafe.getChannelInfo(self.handle)
- self.trigger_connect.emit(
- self.handle, self.pv_name,
- int(_channel_info.cafeConnectionState))
-
- else:
- self.cafe.openPrepare()
- self.handle = self.cafe.open(self.pv_name,
- self.connect_callback)
- self.cafe.addWidget(self.handle, self.widget)
- self.cafe.openNowAndWait(self.timeout, self.handle)
-
- self.initialize_meta_data()
-
- self.pv_message_in_a_box.setWindowTitle(self.pv_name)
-
-
- def initialize_meta_data(self):
-
- _current_value = ""
-
- if self.cafe.isConnected(self.handle) and \
- self.cafe.initCallbackComplete(self.handle):
-
- if self.pvd is None:
- self.pvd = self.cafe.getPVCache(self.handle)
-
- if self.pv_ctrl is None:
- self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
- self.set_precision_and_units()
-
- if self.pv_info is None:
- self.pv_info = self.cafe.getChannelInfo(self.pv_name)
- if "Not Supported" in self.pv_info.className:
- _rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
- self.record_type = _rtype if _rtype is not None else \
- self.pv_info.className
- _rtype = self.cafe.close(self.pv_name.split(".")[0] +
- ".RTYP")
- else:
- self.record_type = self.pv_info.className
-
- _current_value = self.cafe.getCache(self.handle)
- if isinstance(_current_value, (int, float)):
- #space for positive numbers
- _value_form = ("{:<+.%sf}" % self.precision)
- _current_value = _value_form.format(
- round(_current_value, self.precision))
-
- #Reset
- self.initialize_complete = True
-
- #verify user input
- if self.show_units is True:
- if self.suffix == self.units and self.units != "":
- self.show_units = False
-
- self.suggested_text = self.prefix
- if self.prefix:
- self.suggested_text += " "
-
- _suggested_text_from_value = " "
-
- _max_control_abs = 0
-
- if self.pv_ctrl is not None:
- _lower_control_abs = abs(int(self.pv_ctrl.lowerControlLimit))
- _upper_control_abs = abs(int(self.pv_ctrl.upperControlLimit))
- _max_control_abs = max(_lower_control_abs, _upper_control_abs)
- if _max_control_abs is None:
- _max_control_abs = 0
-
- _enum_list = self.pv_ctrl.enumStrings
-
- if _enum_list:
- _enum_list_member_max_length = 0
- _enum_list_member_max_index = 0
-
- for i in range(0, len(_enum_list)):
- if len(_enum_list[i]) > _enum_list_member_max_length:
- _enum_list_member_max_length = len(_enum_list[i])
- _enum_list_member_max_index = i
- _suggested_text_from_value += \
- _enum_list[_enum_list_member_max_index] + "."
- else:
- if self.pv_ctrl.lowerControlLimit < 0:
- _suggested_text_from_value += "-"
- _suggested_text_from_value += str(_max_control_abs) + "."
-
- self.precision = min(9, self.precision) #safety net
- for i in range(0, self.precision):
- _suggested_text_from_value += "0"
-
- if len(_current_value) > len(_suggested_text_from_value):
- _suggested_text_from_value = _current_value
-
- self.suggested_text += _suggested_text_from_value
-
- if self.show_units:
- self.suggested_text += " " + self.units
- self.suggested_text += self.suffix
-
- _suggested_text_length = len(self.suggested_text)
- self.suggested_text = self.suggested_text.center(
- _suggested_text_length+2)
-
- self.max_control_abs_str = str(_max_control_abs)
-
- _max_control_abs_length = len(self.max_control_abs_str)
- _offset = 9
- self.max_control_abs_str = self.max_control_abs_str.center(
- _max_control_abs_length + _offset)
-
- qsettings = QSettings()
- qsettings.beginGroup("Widget")
- qsettings.beginGroup(self.pv_name)
- qsettings.beginGroup(self.widget_class)
-
- _var_text = "suggested_text"
- _ctrl_abs = "max_control_abs_str"
-
- if self.cafe.isConnected(self.handle) and \
- self.cafe.initCallbackComplete(self.handle):
- qsettings.setValue(_var_text, self.suggested_text)
- qsettings.setValue(_ctrl_abs, self.max_control_abs_str)
- else:
- if qsettings.value(_var_text) is not None:
- self.suggested_text = qsettings.value(_var_text)
- if qsettings.value(_ctrl_abs) is not None:
- self.max_control_abs_str = qsettings.value(_ctrl_abs)
-
- qsettings.endGroup()
- qsettings.endGroup()
- qsettings.endGroup()
-
-
- def is_initialize_complete(self):
- icount = 0
- while not self.initialize_complete:
- time.sleep(0.01)
- self.initialize_meta_data()
- icount += 1
- if icount > 50:
- return False
- return True
-
- def cleanup(self, close_pv=True):
- '''Clean up the widget.'''
-
- #Make sure mon id is valid
- if self.handle > 0:
- _monID_list = self.cafe.getMonitorIDs(self.handle)
- if self.monitor_id in _monID_list:
- self.cafe.monitorStop(self.handle, self.monitor_id)
-
- #Do not close of there are other monitors
- if self.cafe.getNoMonitors(self.handle) > 0:
- if close_pv is True:
- self.cafe.close(self.pv_name)
- self.widget.deleteLater()
-
-
- def format_display_value(self, value):
-
- if value is None:
- print(self, self.pv_name, ">>>>format_display_value is None")
- #return
-
- if isinstance(value, str):
- _value_str = value
- elif isinstance(value, int):
- _value_str = str(value)
- else:
- _value_form = ("{:< .%sf}" % self.precision)
- _rounded_value = round(value, self.precision)
- _value_str = _value_form.format(_rounded_value)
-
- if self.show_units:
- _value_str += " " + self.units + " "
- if self.suffix:
- _value_str += " " + self.suffix + " "
-
- if self.prefix:
- _space = ""
- if self.pv_ctrl is not None:
- if self.pv_ctrl.lowerDisplayLimit < 0:
- _space = " "
- _value_str = self.prefix + _space + _value_str
-
- return _value_str
-
- def post_display_value(self, value):
-
- _value_str = self.format_display_value(value)
-
- if "setText" in dir(self):
-
- if LooseVersion(QT_VERSION_STR) >= LooseVersion("5.3"):
- self.blockSignals(True)
- self.setText(_value_str)
- self.blockSignals(False)
- else:
- self.setText(_value_str)
-
- else:
- print("setText method does not exist for this widget class:\n",
- self.widget.__class__)
- print("sender was: ", self.sender())
-
-
- def py_connect_callback(self, handle, pvname, status):
- '''Callback function to be invoked on change of pv connection status.
- Checks for existence of widget. Waits up to a maximun of 100 ms.
- '''
- pv_name = pvname
- self.trigger_connect.emit(int(handle), str(pv_name), int(status))
-
- def receive_connect_update(self, handle, pv_name, status,
- post_display=True):
- '''Triggered by connect signal. For Widget to overload.'''
-
- if pv_name is not None:
- if pv_name != self.pv_name:
- print(("pv_name {0} in receive_connect_update " +
- "does not match: {1}").format(pv_name, self.pv_name))
-
- if status == self.cyca.ICAFE_CS_CONN:
- self.initialize_connect = True
- self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
- self.pv_info = self.cafe.getChannelInfo(self.handle)
- if self.pv_info is not None and self.record_type is None:
- if "Not Supported" in self.pv_info.className:
- _rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
- self.record_type = _rtype if _rtype is not None else \
- self.pv_info.className
- _rtype = self.cafe.close(self.pv_name.split(".")[0] +
- ".RTYP")
- else:
- self.record_type = self.pv_info.className
-
- self.set_precision_and_units(reconnectFlag=True)
-
- if not self.msg_label:
- _value = self.cafe.getCache(handle, dt='native')
- #Another reconnection in progress!!!
-
- if _value is None:
- return
- else:
- _value = self.msg_label
-
- if post_display:
- self.post_display_value(_value)
- self.qt_property_reconnect()
-
- else:
- self.qt_property_disconnect()
-
-
- if status == self.cyca.ICAFE_CS_CLOSED:
- self.initialize_again = True
-
- elif self.initialize_again:
- #monitos_id informs whether or not widget has a monitor
- #CAQMessageButton for instance does not have a monitor
-
- if not self.pv_within_daq_group and self.monitor_id is not None:
- self.monitor_start()
- self.initialize_again = False
-
- return
-
-
- def receive_daq_update(self, daq_pvd, daq_mode, daq_state):
- ''' DAQ mode is widget specific.
- DAQ may be in BS mode, but channels within DAQ stream that
- are not BS enabled will be flagged as CA Mode, i.e., CARead
- '''
-
- _current_qt_dynamic_property = self.qt_dynamic_property_get()
-
- alarm_severity = daq_pvd.alarmSeverity
- self.pvd = daq_pvd
-
- if daq_mode != self.qt_object_name:
- self.qt_object_name = daq_mode
- self.setObjectName(self.qt_object_name)
- self.qt_style_polish()
-
- if daq_state in (self.cyca.ICAFE_DAQ_STOPPED,):
- if _current_qt_dynamic_property != self.DAQ_STOPPED:
- self.qt_property_daq_stopped()
-
- elif daq_state in (self.cyca.ICAFE_DAQ_PAUSED,):
- if _current_qt_dynamic_property != self.DAQ_PAUSED:
- self.qt_property_daq_paused()
-
- elif daq_state in (self.cyca.ICAFE_DAQ_RUN,):
- if daq_mode == self.PV_DAQ_BS and \
- _current_qt_dynamic_property != self.READBACK_STATIC:
- self.qt_property_static()
-
- elif daq_mode == self.PV_DAQ_CA:
- if self.color_mode != self.color_mode_requested:
- self.color_mode = self.color_mode_requested
-
- if self.cafe.isEnum(self.handle) and \
- isinstance(daq_pvd.value[0], int):
- _value = self.cafe.getStringFromEnum(self.handle,
- daq_pvd.value[0])
- else:
- _value = daq_pvd.value[0]
-
- if daq_pvd.status == self.cyca.ICAFE_NORMAL:
- if self.msg_label == "":
- self.post_display_value(_value)
- if daq_mode == self.PV_DAQ_BS:
- return
-
- #Check if color settings are correct
- if alarm_severity > self.cyca.SEV_NO_ALARM:
- self.color_mode = self.READBACK_ALARM
- self.color_mode_requested = self.READBACK_ALARM
-
- if self.color_mode == self.READBACK_ALARM:
- if alarm_severity == self.cyca.SEV_MINOR:
- if _current_qt_dynamic_property != self.ALARM_SEV_MINOR:
- self.qt_property_alarm_sev_minor()
-
- elif alarm_severity == self.cyca.SEV_MAJOR:
- if _current_qt_dynamic_property != self.ALARM_SEV_MAJOR:
- self.qt_property_alarm_sev_major()
-
- elif alarm_severity == self.cyca.SEV_INVALID:
- if _current_qt_dynamic_property != \
- self.ALARM_SEV_INVALID:
- self.qt_property_alarm_sev_invalid()
-
- elif alarm_severity == self.cyca.SEV_NO_ALARM:
- if _current_qt_dynamic_property != \
- self.ALARM_SEV_NO_ALARM:
- self.qt_property_alarm_sev_no_alarm()
-
- elif _current_qt_dynamic_property != self.READBACK_STATIC:
- self.qt_property_static()
-
- else:
- if _current_qt_dynamic_property != self.DISCONNECTED:
- self.qt_property_disconnect()
-
-
- def receive_monitor_dbr_time(self, pvdata, alarm_severity):
- print("called from gateway", self.pv_name, alarm_severity)
- pvdata.show()
-
- def receive_monitor_update(self, value, status, alarm_severity):
- '''Triggered by monitor signal. For Widget to overload.'''
-
- self.mutex_post_display.lock()
- _current_qt_dynamic_property = self.qt_dynamic_property_get()
-
- if status == self.cyca.ICAFE_NORMAL:
-
- if self.msg_label == "":
- self.post_display_value(value)
-
- #For DAQ when channel connects after application start-up
- if _current_qt_dynamic_property == self.DISCONNECTED:
- self.qt_property_initial_values(qt_object_name=self.PV_READBACK)
-
- #Check if color settings are correct
- elif _current_qt_dynamic_property == self.READBACK_STATIC:
- if alarm_severity > self.cyca.SEV_NO_ALARM:
- if alarm_severity < self.cyca.SEV_INVALID:
- self.color_mode = self.READBACK_ALARM
- self.status_tip = ("Widget color mode is dynamic, " +
- "pv with alarm limits")
- elif alarm_severity == self.cyca.SEV_INVALID:
- if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
- self.qt_property_alarm_sev_invalid()
-
- if self.color_mode == self.READBACK_ALARM:
- if alarm_severity == self.cyca.SEV_MINOR:
- if _current_qt_dynamic_property != self.ALARM_SEV_MINOR:
- self.qt_property_alarm_sev_minor()
-
- elif alarm_severity == self.cyca.SEV_MAJOR:
- if _current_qt_dynamic_property != self.ALARM_SEV_MAJOR:
- self.qt_property_alarm_sev_major()
-
- elif alarm_severity == self.cyca.SEV_INVALID:
- if _current_qt_dynamic_property != self.ALARM_SEV_INVALID:
- self.qt_property_alarm_sev_invalid()
-
- elif alarm_severity == self.cyca.SEV_NO_ALARM:
- if _current_qt_dynamic_property != self.ALARM_SEV_NO_ALARM:
- self.qt_property_alarm_sev_no_alarm()
-
- else:
- if _current_qt_dynamic_property != self.DISCONNECTED:
- self.qt_property_disconnect()
-
- self.mutex_post_display.unlock()
-
- def py_monitor_callback(self, handle, pvname, pvdata):
-
- '''Callback function to be invoked on change of pv value.
- cafe.getCache and cafe.set operations permitted within callback.
- '''
-
- pv_name = pvname
- pvd = pvdata
-
- if not hasattr(self, 'cafe'):
- print("py_monitor_callback: name/handle self cafe is NONE",
- pv_name, handle)
- return
-
- self.pvd = pvd
-
- if pvd.status == self.cyca.ICAFE_CS_NEVER_CONN:
- print("initialize again")
- self.initialize()
-
- elif pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
- _alarm_severity = self.cyca.ICAFE_CA_OP_CONN_DOWN
- else:
- _alarm_severity = pvd.alarmSeverity
-
- if self.monitor_dbr_time:
- self.trigger_monitor.emit(pvd, _alarm_severity)
- elif isinstance(pvd.value[0], str):
- self.trigger_monitor_str.emit((pvd.value[0]), pvd.status,
- _alarm_severity)
- elif isinstance(pvd.value[0], int):
- self.trigger_monitor_int.emit((pvd.value[0]), pvd.status,
- _alarm_severity)
- else:
- self.trigger_monitor_float.emit(float(pvd.value[0]), pvd.status,
- _alarm_severity)
-
-
- def monitor_start(self):
- '''Initiate monitor on pv.'''
- if self.handle > 0:
- #Is monitor in waiting - now deleted with monitor_stop
- if self.notify_unison:
- self.monitor_id = self.cafe.monitorStart(
- self.handle, dbr=self.cyca.CY_DBR_TIME)
- #start with gateway supplied monitor callback handler
- elif self.monitor_callback is None:
- self.monitor_id = self.cafe.monitorStart(
- self.handle, cb=self.py_monitor_callback,
- dbr=self.cyca.CY_DBR_TIME,
- notify_milliseconds=self.notify_milliseconds)
- else:
- self.monitor_id = self.cafe.monitorStart(
- self.handle, cb=self.monitor_callback,
- dbr=self.cyca.CY_DBR_TIME,
- notify_milliseconds=self.notify_milliseconds)
-
- def monitor_stop(self):
- if self.handle > 0:
- _monID_list = self.cafe.getMonitorIDs(self.handle)
- _monID_inwaiting_list = self.cafe.getMonitorIDsInWaiting(
- self.handle)
- _monID_all = _monID_list + _monID_inwaiting_list
-
- if self.monitor_id in _monID_all:
- self.cafe.monitorStop(self.handle, self.monitor_id)
- #Is monitor in waiting?
- #remove monitors in waiting - to do
-
- def reconnect_channel(self):
- self.cafe.reconnect([self.handle]) #list
-
- def set_desc(self):
- '''Set description of pv from pv.DESC'''
-
- if self.cafe.hasDescription(self.handle):
- self.desc = self.cafe.getDescription(self.handle)
- return
- elif self.desc is not None:
- return
- else:
- self.cafe.supplementHandle(self.handle)
- if self.cafe.hasDescription(self.handle):
- self.desc = self.cafe.getDescription(self.handle)
-
- if self.desc is not None:
- return
-
- ###Back-up solution
- _found = str(self.pv_name).find(".")
- if _found != -1:
- _pv_desc = str(self.pv_name)[0:_found] +".DESC"
- else:
- _pv_desc = self.pv_name +".DESC"
- _handle_desc = self.cafe.getHandleFromPVName(_pv_desc)
-
- _handle_desc_already_open = False
-
- if _handle_desc == 0:
- self.cafe.openPrepare()
- _handle_desc = self.cafe.open(_pv_desc)
- self.cafe.openNowAndWait(self.timeout, _handle_desc)
- time.sleep(0.001)
- else:
- _handle_desc_already_open = True
-
- if self.cafe.isConnected(_handle_desc):
- self.desc = self.cafe.getCache(_handle_desc, 'str')
- if self.desc is None:
- self.desc = self.cafe.get(_handle_desc, 'str')
- else:
- self.desc = None
-
- if not _handle_desc_already_open:
- self.cafe.close(_handle_desc)
-
- def set_precision_and_units(self, reconnectFlag: bool = False):
- '''Set the pv precision and units.'''
- if self.pv_ctrl is None or reconnectFlag is True:
- self.pv_ctrl = self.cafe.getCtrlCache(self.handle)
-
- if self.pv_ctrl is not None:
- if not self.has_precision_user:
- self.precision = self.pv_ctrl.precision
- if self.pv_ctrl.units is not None:
- self.units = str(self.pv_ctrl.units)
- else:
- self.units = ""
-
- if reconnectFlag is True:
- #verify user input
- if self.show_units is True and self.suffix is not None:
- if self.suffix == self.units:
- self.show_units = False
-
-
- def _qt_readback_color_mode(self):
- '''Color mode is determined from CAFE and depends on whether the pv:
- has alarm limits (self.color_mode = 'readbackAlarm')
- or is without alarm limits (self.color_mode = 'readbackStatic')
- '''
-
- #Already set by user
- if self.color_mode is self.READBACK_ALARM:
- return
-
- if self.cafe.isConnected(self.handle):
- pvd = self.cafe.getPVCache(self.handle)
- if pvd.alarmSeverity in (self.cyca.SEV_MINOR, self.cyca.SEV_MAJOR) \
- or self.cafe.hasAlarmStatusSeverity(self.handle):
- self.color_mode = self.READBACK_ALARM
- self.status_tip = ("Widget color mode is dynamic, " +
- "pv with alarm limits")
- else:
- self.color_mode = self.READBACK_STATIC
- self.status_tip = ("Widget color mode is static, " +
- "pv without alarm limits")
-
-
- def qt_property_initial_values(self, qt_object_name: str = None,
- tool_tip: bool = True):
-
- '''Set Qt property values.'''
- self.qt_object_name = qt_object_name
- if tool_tip:
- self.setToolTip(self.pv_name)
- self.setObjectName(self.qt_object_name)
- if self.qt_object_name in self.qt_object_to_property.keys():
- self._qt_property_selected = copy.deepcopy(
- self.qt_object_to_property[self.qt_object_name])
- else:
- print("qt_property_initial_values: Object not found in dictionary")
-
- if self.cafe.isConnected(self.handle):
-
- if self.qt_object_name == self.PV_READBACK:
- self._qt_readback_color_mode()
- #self.setStatusTip(self.status_tip)
-
- elif self.qt_object_name == self.PV_CONTROLLER:
- if self.color_mode == self.ACT_ON_BEAM:
- #self.setStatusTip("PV setting acts directly on beam")
- pass
- else:
- self.color_mode = self.NOT_ACT_ON_BEAM
- #self.setStatusTip("PV setting does not influence beam")
-
- elif self.qt_object_name == self.PV_DAQ_CA:
- self._qt_readback_color_mode()
-
- elif self.qt_object_name == self.PV_DAQ_BS:
- self.color_mode = self.READBACK_STATIC
-
- self._qt_dynamic_property_set(self.color_mode)
-
- else:
- self.qt_property_disconnect()
-
-
- def qt_dynamic_property_get(self, property_state: str = None):
- '''Retrieves the requested property value
- else that which is currently true'''
-
- for _property, _value in self._qt_property_selected.items():
- if property_state is not None:
- if _property == property_state:
- return _value
- elif _value:
- return _property
-
- def _qt_dynamic_property_set(self, property_state: str = None):
- '''
- Set the Input property to true, and the remainder to False
- If None is given then all dynamic properties are set to False
- '''
-
- for _property in self._qt_property_selected.keys():
- if _property == property_state:
- self.setProperty(_property, True)
- self._qt_property_selected[_property] = True
- else:
- self.setProperty(_property, False)
- self._qt_property_selected[_property] = False
-
- def qt_property_disconnect(self):
- '''Set Qt disconnect property value.'''
- self._qt_dynamic_property_set(self.DISCONNECTED)
- self.qt_style_polish()
-
- def qt_property_reconnect(self):
- '''Set Qt connected property value.'''
-
- if self.qt_object_name == self.PV_READBACK:
- self._qt_readback_color_mode()
- #self.setStatusTip(self.status_tip)
-
-
- elif self.qt_object_name == self.PV_CONTROLLER:
- if self.color_mode == self.ACT_ON_BEAM:
- #self.setStatusTip("PV setting acts directly on beam")
- pass
- else:
- self.color_mode = self.NOT_ACT_ON_BEAM
- #self.setStatusTip("PV setting does not influence beam")
-
-
- #self._qt_property_selected =
- self._qt_dynamic_property_set(self.color_mode)
-
- self.qt_style_polish()
-
- def qt_property_alarm_sev_major(self):
- '''Set Qt MAJOR property value.'''
-
- self._qt_dynamic_property_set(self.ALARM_SEV_MAJOR)
- self.setStatusTip("{0} reports value in MAJOR alarm state!".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_property_alarm_sev_minor(self):
- '''Set Qt MINOR property value.'''
- self._qt_dynamic_property_set(self.ALARM_SEV_MINOR)
- self.setStatusTip("{0} reports value in MINOR alarm state!".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_property_alarm_sev_no_alarm(self):
- '''Set Qt READBACK_ALARM property value.'''
- #self._qt_property_selected =
- self._qt_dynamic_property_set(self.READBACK_ALARM)
- self.setStatusTip("{0} reports value in normal state".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_property_alarm_sev_invalid(self):
- '''Set Qt INVALID property value.'''
- self._qt_dynamic_property_set(self.ALARM_SEV_INVALID)
- self.setStatusTip("PV={0} reports an INVALID value!".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_property_static(self):
- '''Set Qt STATIC property value.'''
- self._qt_dynamic_property_set(self.READBACK_STATIC)
- self.setStatusTip("PV={0} does not have an alarm state".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_property_daq_stopped(self):
- '''Set Qt STOPPED property value.'''
- self._qt_dynamic_property_set(self.DAQ_STOPPED)
- self.setStatusTip("PV={0} reports DAQ has stopped".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_property_daq_paused(self):
- '''Set Qt STOPPED property value.'''
- self._qt_dynamic_property_set(self.DAQ_PAUSED)
- self.setStatusTip("PV={0} reports DAQ has paused".format(
- self.pv_name))
- self.qt_style_polish()
-
- def qt_style_polish(self, redraw=False):
- if redraw:
- self.style().unpolish(self)
- self.style().polish(self)
- event = QEvent(QEvent.StyleChange)
- QApplication.sendEvent(self, event)
- self.update()
- self.updateGeometry()
- else:
- self.style().polish(self)
- QApplication.processEvents()
-
- def pv_status_text_header(self, source="Channel Access"):
- _source = source
- _source_separator = "----------------------------------------"
- _text = """
-
- Widget: {0} ({1}, {2})
- """.format(self.widget_class, self.qt_object_name, self.color_mode)
-
- if self.msg_press_value is not None:
- _text += """
- On press, sends value: {0}
- """.format(self.msg_press_value, "DarkOrchid")
-
- if self.msg_release_value is not None:
- _text += """
- On release, sends value: {0}
- """.format(self.msg_release_value, "DarkOrchid")
-
- if self.pv_within_daq_group:
- if self.qt_object_name in self.PV_DAQ_BS:
- _ds_color = "Navy Blue"
- else:
- _ds_color = "Black"
- else:
- _ds_color = "Black"
-
- _text += """
- {0}
- Data source: {1}
- {0}
- PV: {2}
- """.format(_source_separator, _source, self.pv_name, "DarkOrchid",
- _ds_color)
-
- if self.desc is None:
- self.set_desc()
-
- if self.desc == "":
- _text += """
- """
- return _text
-
- _text += """
-
- Description: {0}
-
- """.format(self.desc, "DarkOrchid")
-
- return _text
-
-
- def pv_status_text_enum(self):
-
- _val_enum = None
- _value = self.pvd.value[0]
- if isinstance(_value, str):
- _val_enum = self.cafe.getEnumFromString(self.handle, _value)
- elif _value is not None:
- _val_enum = self.cafe.getStringFromEnum(self.handle, _value)
-
- _color = "Blue"
-
- #To catch case where channel is called by user
-
-
- #To catch DAQ case
- if self.pv_within_daq_group:
- if self.qt_object_name in self.PV_DAQ_BS:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DAQ_PAUSED,
- self.DISCONNECTED):
- _color = "White"
- elif self.qt_object_name in self.PV_DAQ_CA:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DISCONNECTED):
- _color = "White"
-
- elif not self.cafe.isConnected(self.handle):
- _color = "White"
- elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
- _color = "White"
-
- _text = """
-
- Value: {1} [{2}]
- """.format(_color, _value, _val_enum)
-
- return _text
-
- def pv_status_text_data(self):
-
- _value_str = ""
- _first_end = 9
- _end_range = min(self.pvd.nelem, _first_end)
- if _end_range > 1:
- _value_str = "[ "
- for i in range(0, _end_range):
- _value = self.pvd.value[i]
- if _value is None:
- _value = '0'
- if isinstance(_value, str):
- _value_str += _value
- elif isinstance(_value, int):
- _value_str += str(_value)
- else:
- if self.pv_ctrl is not None:
- _value_form = ("{:<.%sf}" % self.pv_ctrl.precision)
- _value_str += _value_form.format(
- round(_value, self.pv_ctrl.precision))
- if i < (_end_range-1):
- _value_str += " "
-
- if self.pvd.nelem > _first_end:
- _value_str += " ... "
- _value = self.pvd.value[self.pvd.nelem-1]
- if isinstance(_value, str):
- _value_str += _value
- elif isinstance(_value, int):
- _value_str += str(_value)
- else:
- if self.pv_ctrl is not None:
- _value_form = ("{:<.%sf}" % self.pv_ctrl.precision)
- _value_str += _value_form.format(
- round(_value, self.pv_ctrl.precision))
- _value_str += " "
- if _end_range > 1:
- _value_str += "]"
-
- _color = "Blue"
-
-
- #To catch DAQ case
- if self.pv_within_daq_group:
-
- if self.qt_object_name in self.PV_DAQ_BS:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DAQ_PAUSED,
- self.DISCONNECTED):
- _color = "White"
- elif self.qt_object_name in self.PV_DAQ_CA:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DISCONNECTED):
- _color = "White"
-
- elif not self.cafe.isConnected(self.handle):
- _color = "White"
- elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
- _color = "White"
-
- _text = """
-
- Value: {1} {2}
- """.format(_color, _value_str, self.units)
-
- return _text
-
-
- def pv_status_text_timestamp(self):
- _status_not_ok_color = "IndianRed"
- _status_ok_color = "DimGray"
- _ts_color = "Blue"
- _color = _status_ok_color
-
- #To catch DAQ case
- if self.pv_within_daq_group:
- if self.qt_object_name in self.PV_DAQ_BS:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DAQ_PAUSED,
- self.DISCONNECTED):
- _ts_color = "White"
- _color = "White"
- elif self.qt_object_name in self.PV_DAQ_CA:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DISCONNECTED):
- _ts_color = "White"
- _color = "White"
-
- elif not self.cafe.isConnected(self.handle):
- _ts_color = "White"
- elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
- _ts_color = "White"
-
- if self.pvd.status != self.cyca.ICAFE_NORMAL:
- _color = _status_not_ok_color
- _text = """
- Timestamp: {2}
- Status: {3}
{4}
- """.format(_ts_color, _color, self.pvd.tsDateAsString,
- self.pvd.statusAsString,
- self.cafe.getStatusInfo(self.pvd.status))
-
- return _text
-
-
- def pv_status_text_alarm(self):
- _text = """
- """
- _color = "DimGray"
-
- #To catch DAQ case
- if self.pv_within_daq_group:
-
- if self.pvd.alarmSeverity == self.cyca.SEV_MINOR:
- _color = "Yellow"
- elif self.pvd.alarmSeverity == self.cyca.SEV_MAJOR:
- _color = "Red"
- elif self.pvd.alarmSeverity == self.cyca.SEV_INVALID:
- _color = "White"
-
- if self.qt_object_name in self.PV_DAQ_BS:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DAQ_PAUSED,
- self.DISCONNECTED):
- _color = "White"
- elif self.qt_object_name in self.PV_DAQ_CA:
- if self.qt_dynamic_property_get() in (self.DAQ_STOPPED,
- self.DISCONNECTED):
- _color = "White"
-
-
- elif not self.cafe.isConnected(self.handle):
- _color = "White"
-
- elif self.pvd.status == self.cyca.ICAFE_CA_OP_CONN_DOWN:
- _color = "White"
-
- elif self.pvd.alarmSeverity == self.cyca.SEV_MINOR:
- _color = "Yellow"
- elif self.pvd.alarmSeverity == self.cyca.SEV_MAJOR:
- _color = "Red"
- elif self.pvd.alarmSeverity == self.cyca.SEV_INVALID:
- _color = "White"
-
- _text += """
- Alarm status: {1}
- Alarm severity: {2}
- """.format(_color, self.pvd.alarmStatusAsString,
- self.pvd.alarmSeverityAsString)
-
- return _text
-
- def pv_access(self):
- _accessIs = ""
- if self.pv_info is None:
- self.pv_info = self.cafe.getChannelInfo(self.handle)
- if self.pv_info.accessRead:
- _accessIs += "Read"
- if self.pv_info.accessWrite:
- _accessIs += "Write"
- return _accessIs
-
- def pv_status_text_enum_metadata(self):
- _text = """
- ENUM strings: {2}
- Data type (native): {3}
- Record type: {4}
- RW Access: {5}
- IOC: {6}
- """.format("MediumBlue", "DarkOrchid", self.pvc.enumStrings,
- self.pv_info.dataTypeAsString,
- self.record_type, self.pv_access(),
- self.pv_info.hostName)
- return _text
-
- def pv_status_text_metadata(self):
-
- if self.pv_info is None:
- self.pv_info = self.cafe.getChannelInfo(self.handle)
- if self.pv_info is not None and self.record_type is None:
- if "Not Supported" in self.pv_info.className:
- _rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
- self.record_type = _rtype if _rtype is not None else \
- self.pv_info.className
- self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
- else:
- self.record_type = self.pv_info.className
-
- if self.record_type in ["stringin", "stringout"]:
- _text = """
- Data type (native): {1}
- Record type: {2}
- RW Access: {3}
- IOC: {4}
- """.format("MediumBlue", self.pv_info.dataTypeAsString,
- self.record_type, self.pv_access(),
- self.pv_info.hostName)
- return _text
-
- _text = """
- """
- if self.pvd.nelem > 1:
- _text += """
- Nelem: {1}
- """.format("MediumBlue", self.pvd.nelem)
-
- _text += """
- Precision (PV): {1}
- Data type (native): {2}
- Record type: {3}
- RW Access: {4}
- IOC: {5}
- """.format("MediumBlue", self.pvc.precision,
- self.pv_info.dataTypeAsString,
- self.record_type, self.pv_access(),
- self.pv_info.hostName)
- return _text
-
-
- def pv_status_text_alarm_limits(self):
-
- if self.pv_info is None:
- self.pv_info = self.cafe.getChannelInfo(self.handle)
- if self.pv_info is not None and self.record_type is None:
- if "Not Supported" in self.pv_info.className:
- _rtype = self.cafe.get(self.pv_name.split(".")[0] + ".RTYP")
- self.record_type = _rtype if _rtype is not None else \
- self.pv_info.className
- self.cafe.close(self.pv_name.split(".")[0] + ".RTYP")
- else:
- self.record_type = self.pv_info.className
-
- _text = """
- """
-
- #No all record types have alarms
- #className is not supported at psi since introduction of the
- #linux ca gateway
- #Not Supported by Gateway
-
- if "Not Supported" in str(self.record_type):
- pass
- elif self.record_type not in self._alarm_severity_record_types:
- return _text
-
- if self.pvc.lowerAlarmLimit == 0 and self.pvc.upperAlarmLimit == 0 and \
- self.pvc.lowerWarningLimit == 0 and self.pvc.upperWarningLimit == 0:
- return _text
-
- if self.cafe.hasAlarmStatusSeverity(self.handle):
- _text = """
- Lower/Upper alarm limit:
- {1} / {4}
- Lower/Upper warning limit:
- {2} / {3}
-
- """.format("MediumBlue",
- self.pvc.lowerAlarmLimit, self.pvc.lowerWarningLimit,
- self.pvc.upperWarningLimit, self.pvc.upperAlarmLimit)
- return _text
-
- def pv_status_text_display_limits(self):
- _text = """
- """
- if self.pvc.lowerDisplayLimit == 0 and \
- self.pvc.upperDisplayLimit == 0 and \
- self.pvc.lowerControlLimit == 0 and self.pvc.upperControlLimit == 0:
- return _text
- _text = """
- Lower/Upper control limit:
- {3} / {4}
- Lower/Upper display limit:
- {1} / {2}
-
- """.format("MediumBlue",
- self.pvc.lowerDisplayLimit, self.pvc.upperDisplayLimit,
- self.pvc.lowerControlLimit, self.pvc.upperControlLimit)
- return _text
-
-
- def pv_status_text(self):
- '''pv metadata to accompany widget's dialog box.'''
- QApplication.processEvents()
- _source = "Channel Access"
-
- if self.pv_within_daq_group:
- if self.qt_object_name == self.PV_DAQ_BS:
- _source = "DAQ (Beam Synchronous)"
- #self.pvd written to in receive_daq_update
- elif self.qt_object_name == self.PV_DAQ_CA:
- _source = "DAQ (Channel Access)"
- self.pvd = self.cafe.getPVCache(self.handle)
- if self.pvd.pulseID > 0:
- _source += "
Pulse ID: {0}".format(self.pvd.pulseID)
- else:
- self.pvd = self.cafe.getPVCache(self.handle)
-
- self.pvc = self.cafe.getCtrlCache(self.handle)
-
- _text_data = """
- """
- if self.pvd.status == self.cyca.ECAFE_INVALID_HANDLE:
- _text_data = """ Status: {1}
{2}
- """.format("Blue",
- "Channel closed while DAQ in STOP state.",
- ("PV info requires DAQ to be in " +
- "RUN/PAUSED state"))
-
-
- elif self.pvd.status == self.cyca.ICAFE_CS_NEVER_CONN:
- _text_data = """ Status: {1}
{2}
- """.format("Red", self.pvd.statusAsString,
- self.cafe.getStatusInfo(self.pvd.status))
-
- elif self.pvc.noEnumStrings > 0:
- _text_data = (self.pv_status_text_enum() +
- self.pv_status_text_timestamp() +
- self.pv_status_text_alarm() +
- self.pv_status_text_enum_metadata())
-
- else:
- _text_data = (self.pv_status_text_data() +
- self.pv_status_text_timestamp() +
- self.pv_status_text_alarm() +
- self.pv_status_text_metadata() +
- self.pv_status_text_alarm_limits() +
- self.pv_status_text_display_limits())
-
- self.pv_message_in_a_box.setText(
- self.pv_status_text_header(source=_source) + _text_data
- )
- QApplication.processEvents()
- self.pv_message_in_a_box.exec()
-
-
- def lookup_archiver(self):
- '''Plot pvdata from archiver.'''
- #"https://ui-data-api.psi.ch/prepare?
- #channel=sf-archiverappliance/"
- urlIs = self.url_archiver
- urlIs = urlIs + self.pv_name
-
- if not QDesktopServices.openUrl(QUrl(urlIs)):
- print("URL FOR ARCHIVER NOT FOUND", urlIs)
- #if self.show_log_message is not None:
- # self.show_log_message(MsgSeverity.ERROR, __pymodule__, _line(),
- # "Failed to open URL {0}".format(urlIs))
-
- def lookup_databuffer(self):
- '''Plot beam synchronous pvdata from databuffer.'''
- #""https://ui-data-api.psi.ch/prepare?channel = sf-databuffer/"
- urlIs = self.url_databuffer
- urlIs = urlIs + self.pv_name
-
- if not QDesktopServices.openUrl(QUrl(urlIs)):
- print("URL FOR DATA BUFFER NOT FOUND", urlIs)
- #if self.show_log_message is not None:
- # self.show_log_message(MsgSeverity.ERROR, __pymodule__, _line(),
- # "Failed to open URL {0}".format(urlIs))
- QApplication.processEvents()
-
- def strip_chart(self):
- '''PShell strip chart.'''
- configStr = ("-config = [[[true,\"" + self.pv_name +
- "\",\"Channel\",1,1]]]")
- commandStr = "/sf/op/bin/strip_chart"
- argStr = ["-nlaf", "-start", configStr, "&"]
- QProcess.startDetached(commandStr, argStr)
-
-
- def display_parameters(self):
- display_wgt = QDialog(self)
-
- _rect = display_wgt.geometry() #
- _parentRect = self.context_menu.geometry()
-
- _rect.moveTo(display_wgt.mapToGlobal(
- QPoint(_parentRect.x() + _parentRect.width() - _rect.width(),
- _parentRect.y())))
-
- display_wgt.setGeometry(_rect)
- display_wgt.setWindowTitle(self.pv_name)
- layout = QVBoxLayout()
-
- precision_flag = True
- if self.pv_ctrl is not None:
- if self.pv_ctrl.precision <= 0:
- precision_flag = False
-
- if self.cafe.getDataTypeNative(self.handle) in (
- self.cyca.CY_DBR_FLOAT,
- self.cyca.CY_DBR_DOUBLE) and precision_flag:
- #precision user
- _hbox_wgt = QWidget()
- _hbox = QHBoxLayout()
- precision_user_label = QLabel("Precision (user):")
- self.precision_user_wgt = QSpinBox(self)
- self.precision_user_wgt.setFocusPolicy(Qt.NoFocus)
- self.precision_user_wgt.setValue(int(self.precision))
- if self.pv_ctrl is not None:
- _max = self.pv_ctrl.precision
- else:
- _max = 6
- self.precision_user_wgt.setMaximum(_max)
- self.precision_user_wgt.valueChanged.connect(
- self.precision_user_changed)
- _hbox.addWidget(precision_user_label)
- _hbox.addWidget(self.precision_user_wgt)
- _hbox_wgt.setLayout(_hbox)
-
- precision_user_label.setFixedWidth(110)
- self.precision_user_wgt.setFixedWidth(35)
- _hbox_wgt.setFixedWidth(160)
-
- #precision ioc
- _hbox2_wgt = QWidget()
- _hbox2 = QHBoxLayout()
- precision_ioc_label = QLabel("Precision (ioc): ")
- precision_ioc = QPushButton(self)
- precision_ioc.setText(" {} ".format(_max))
- precision_ioc.clicked.connect(self.precision_ioc_reset)
-
- _hbox2.addWidget(precision_ioc_label)
- _hbox2.addWidget(precision_ioc)
- _hbox2_wgt.setLayout(_hbox2)
-
- precision_ioc_label.setFixedWidth(110)
- precision_ioc.setFixedWidth(20)
- _hbox2_wgt.setFixedWidth(145)
-
- layout.addWidget(_hbox_wgt)
- layout.addWidget(_hbox2_wgt)
-
- #precision refresh rate
- _hbox3_wgt = QWidget()
- _hbox3 = QHBoxLayout()
- refresh_freq_label = QLabel("Refresh rate: ")
- _default_refresh_val = 0 if self.notify_freq_hz_default <= 0 else \
- self.notify_freq_hz_default
-
- self.refresh_freq_combox_idx_dict = {0: 0, 1: 10, 2: 5, 3: 2, 4: 1,
- 5: 0.5, 6: _default_refresh_val}
- refresh_freq = QComboBox(self)
- refresh_freq.addItem('direct')
- refresh_freq.addItem('{0} Hz'.format(
- self.refresh_freq_combox_idx_dict[1]))
- refresh_freq.addItem('{0} Hz'.format(
- self.refresh_freq_combox_idx_dict[2]))
- refresh_freq.addItem('{0} Hz'.format(
- self.refresh_freq_combox_idx_dict[3]))
- refresh_freq.addItem('{0} Hz'.format(
- self.refresh_freq_combox_idx_dict[4]))
- refresh_freq.addItem('{0} Hz'.format(
- self.refresh_freq_combox_idx_dict[5]))
-
- _default_text = 'default (direct)' if _default_refresh_val == 0 else \
- 'default ({0} Hz)'.format(self.refresh_freq_combox_idx_dict[6])
-
- refresh_freq.addItem(_default_text)
-
- for key, value in self.refresh_freq_combox_idx_dict.items():
- if value == self.notify_freq_hz:
- refresh_freq.setCurrentIndex(key)
- break
- refresh_freq.currentIndexChanged.connect(self.refresh_rate_changed)
-
- _hbox3.addWidget(refresh_freq_label)
- _hbox3.addWidget(refresh_freq)
- _hbox3_wgt.setLayout(_hbox3)
-
- refresh_freq_label.setFixedWidth(110)
- refresh_freq.setFixedWidth(115)
- _hbox3_wgt.setFixedWidth(235)
-
- layout.addWidget(_hbox3_wgt)
-
- layout.setAlignment(Qt.AlignLeft)
- layout.setContentsMargins(10, 0, 0, 0)
- layout.setSpacing(0)
-
- display_wgt.setMinimumWidth(340)
- display_wgt.setLayout(layout)
-
- display_wgt.exec()
- QApplication.processEvents()
-
- def precision_ioc_reset(self):
- if self.pv_ctrl is not None:
- self.precision_user = self.pv_ctrl.precision
- self.precision = self.pv_ctrl.precision
- if self.precision is not None:
- self.precision_user_wgt.setValue(self.precision)
-
- def precision_user_changed(self, new_value):
- self.precision_user = new_value
- self.precision = new_value
-
- _pvd = self.cafe.getPVCache(self.handle)
-
- if _pvd.value[0] is not None:
- if isinstance(_pvd.value[0], float):
- self.trigger_monitor_float.emit(
- _pvd.value[0], _pvd.status, _pvd.alarmSeverity)
-
- def refresh_rate_changed(self, new_idx):
- _notify_freq_hz = self.refresh_freq_combox_idx_dict[new_idx]
- self.notify_milliseconds = 0 if _notify_freq_hz == 0 else \
- 1000 / _notify_freq_hz
- self.notify_freq_hz = _notify_freq_hz
-
- if self.notify_unison:
- self.notify_unison = False
- self.monitor_stop()
- self.monitor_start()
-
- else:
- self.cafe.updateMonitorPolicyDeltaMS(
- self.handle, self.monitor_id, self.notify_milliseconds)
-
- #https://doc.qt.io/qt-5.9/qtwidgets-mainwindows-menus-example.html
- #Since Qt5 this has to be implemented in order to avoid the Select
- #All dialogue button appearing..
- def contextMenuEvent(self, event):
- return
-
- def showContextMenu(self):
- self.context_menu.exec(QCursor.pos())
-
- def mousePressEvent(self, event):
- '''Action on mouse press event.'''
- button = event.button()
- if button == Qt.RightButton:
- self.context_menu.exec(QCursor.pos())
- self.clearFocus()
-
- def mouseReleaseEvent(self, event):
- event.ignore()
-