cleanup
This commit is contained in:
@ -30,8 +30,7 @@ class MainWindow(QMainWindow):
|
||||
redis_connector = RedisConnector(redis_url)
|
||||
connector = BECConnector(redis_connector)
|
||||
data_access = DataAccess(connector)
|
||||
|
||||
|
||||
|
||||
|
||||
if offline:
|
||||
title = f"{title} (offline)"
|
||||
|
@ -1,6 +1,6 @@
|
||||
from os import listdir
|
||||
from os.path import isfile, join
|
||||
|
||||
import h5py
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import defaultdict, deque
|
||||
@ -10,6 +10,7 @@ from collections import deque
|
||||
from PyQt5.QtCore import QObject, pyqtSignal
|
||||
from bec_utils import BECMessage
|
||||
|
||||
|
||||
pyqtWrapperType = type(QObject)
|
||||
|
||||
|
||||
@ -18,104 +19,33 @@ class FinalMeta(pyqtWrapperType, ABC):
|
||||
|
||||
|
||||
class ConnectorBase(QObject, metaclass=FinalMeta):
|
||||
new_monitor_value = pyqtSignal(str)
|
||||
msg_update = pyqtSignal(BECMessage.ScanStatusMessage)
|
||||
msg_queue_update = pyqtSignal(BECMessage.ScanQueueMessage)
|
||||
msg_scan_update = pyqtSignal(list)
|
||||
storage_device = defaultdict(lambda: deque(maxlen=5000))
|
||||
current_devices = []
|
||||
|
||||
@abstractmethod
|
||||
def start_scan_sub(self):
|
||||
""" """
|
||||
|
||||
@abstractmethod
|
||||
def start_monitor_sub(self):
|
||||
""" """
|
||||
|
||||
@abstractmethod
|
||||
def start_scan_msg_sub(self):
|
||||
""" """
|
||||
|
||||
@abstractmethod
|
||||
def start_queue_msg_sub(self):
|
||||
""" """
|
||||
|
||||
|
||||
class BECConnector(ConnectorBase):
|
||||
new_monitor_value = pyqtSignal(str)
|
||||
msg_update = pyqtSignal(BECMessage.ScanStatusMessage)
|
||||
msg_scan_update = pyqtSignal(list)
|
||||
msg_queue_update = pyqtSignal(BECMessage.ScanQueueMessage)
|
||||
|
||||
|
||||
def __init__(self, redis_connector) -> None:
|
||||
super().__init__()
|
||||
self.connector = redis_connector
|
||||
self.producer = self.connector.producer()
|
||||
self._scan_sub = None
|
||||
self._monitor_sub = None
|
||||
self._scan_msg_sub = None
|
||||
self._queue_msg_sub = None
|
||||
self.storage_device = defaultdict(lambda: deque(maxlen=5000))
|
||||
|
||||
|
||||
def start_scan_sub(self):
|
||||
self._scan_sub = self.connector.consumer(
|
||||
MessageEndpoints.scan_status(), cb=self.scan_status_update, parent=self
|
||||
)
|
||||
self._scan_sub.start()
|
||||
|
||||
def start_scan_msg_sub(self):
|
||||
self._scan_msg_sub = self.connector.consumer(
|
||||
MessageEndpoints.scan_segment(), cb=self.scan_msg_update, parent=self
|
||||
)
|
||||
self._scan_msg_sub.start()
|
||||
|
||||
def start_queue_msg_sub(self):
|
||||
self._queue_msg_sub = self.connector.consumer(
|
||||
MessageEndpoints.scan_queue_request(), cb=self.queue_msg_update, parent=self
|
||||
)
|
||||
self._queue_msg_sub.start()
|
||||
|
||||
@staticmethod
|
||||
def queue_msg_update(msg, parent):
|
||||
print("queue_msg_update", msg)
|
||||
|
||||
msg = BECMessage.ScanQueueMessage.loads(msg.value)
|
||||
parent.msg_queue_update.emit(msg)
|
||||
|
||||
@staticmethod
|
||||
def scan_msg_update(msg, parent):
|
||||
# print("scan_msg_update", msg)
|
||||
msg = BECMessage.ScanMessage.loads(msg.value)
|
||||
parent.msg_scan_update.emit(msg)
|
||||
|
||||
@staticmethod
|
||||
def scan_status_update(msg, parent):
|
||||
print("scan_status_update", msg)
|
||||
msg = BECMessage.ScanStatusMessage.loads(msg.value)
|
||||
parent.msg_update.emit(msg)
|
||||
|
||||
def start_monitor_sub(self):
|
||||
self._monitor_sub = self.connector.consumer(
|
||||
pattern=MessageEndpoints.device_readback("*"),
|
||||
cb=self.monitor_update,
|
||||
parent=self,
|
||||
)
|
||||
self._monitor_sub.start()
|
||||
|
||||
@staticmethod
|
||||
def monitor_update(msg, parent):
|
||||
dev = msg.topic.decode().split(
|
||||
MessageEndpoints._device_readback + "/")[-1].split(":sub")[0]
|
||||
msg = BECMessage.DeviceMessage.loads(msg.value)
|
||||
parent.append_device_reading(msg, dev)
|
||||
|
||||
def append_device_reading(self, msg: BECMessage.DeviceMessage, device_name: str):
|
||||
signals = msg.content["signals"]
|
||||
self.storage_device[device_name].append(signals)
|
||||
self.new_monitor_value.emit(device_name)
|
||||
|
||||
|
||||
class DataAccess(QObject):
|
||||
new_mon_value = pyqtSignal(str)
|
||||
used_devices_updated = pyqtSignal(bool)
|
||||
@ -128,19 +58,13 @@ class DataAccess(QObject):
|
||||
super().__init__()
|
||||
self.connector = connector
|
||||
self.start_all_subscriptions(self.connector)
|
||||
self.device_storage = self.connector.storage_device
|
||||
|
||||
self.used_devices = []
|
||||
|
||||
self.scan_nr = ""
|
||||
self.positions = None
|
||||
self.used_motors = []
|
||||
self.connect_pyqtSignals()
|
||||
print('data_access initialized')
|
||||
|
||||
def start_all_subscriptions(self, conn):
|
||||
conn.start_scan_sub()
|
||||
conn.start_monitor_sub()
|
||||
conn.start_scan_msg_sub()
|
||||
conn.start_queue_msg_sub()
|
||||
|
||||
def on_msg_update(self, msg: BECMessage.ScanStatusMessage):
|
||||
"""
|
||||
@ -148,31 +72,20 @@ class DataAccess(QObject):
|
||||
Args:
|
||||
msg (BECMessage.ScanStatusMessage): A scan status message
|
||||
"""
|
||||
print("on_msg_update called")
|
||||
if msg.content["status"] == "open":
|
||||
self.get_scan_info(msg)
|
||||
# self.reset_plot_data()
|
||||
self.new_scan_started.emit(True)
|
||||
else:
|
||||
print("scan ended")
|
||||
|
||||
# def on_msg_scan_update(self, message):
|
||||
# """
|
||||
# This fcn runs whenever there is a new message from the device, could be more than one scan at the time.
|
||||
# Args:
|
||||
# message (_type_): _description_
|
||||
# """
|
||||
# if message[0].content["point_id"] == 0:
|
||||
# self.update_used_devices(message)
|
||||
# self.update_all_device_names()
|
||||
|
||||
# for scan_segment_mess in message:
|
||||
# self.new_scan_msg.emit(scan_segment_mess)
|
||||
def get_scan_info(self, msg):
|
||||
print(
|
||||
"--------------------------------------------------------------------------------------------"
|
||||
)
|
||||
print("New scan started")
|
||||
self.used_motors = msg.content["info"]["primary"]
|
||||
self.scan_nr = msg.content["info"]["scan_number"]
|
||||
self.positions = msg.content["info"]["positions"]
|
||||
print(
|
||||
"--------------------------------------------------------------------------------------------"
|
||||
)
|
||||
@ -181,23 +94,39 @@ class DataAccess(QObject):
|
||||
self.connector.msg_update.connect(self.on_msg_update)
|
||||
|
||||
|
||||
def get_files():
|
||||
def get_files(path):
|
||||
|
||||
mypath = "/home/stalbe_j/Documents/grum"
|
||||
return [f for f in listdir(mypath) if isfile(join(mypath, f))]
|
||||
return [f for f in listdir(path) if isfile(join(path, f))]
|
||||
|
||||
|
||||
def plot_recon():
|
||||
files = get_files()
|
||||
recon_files = [file for file in files if file.endswith("h5")]
|
||||
for file in recon_files:
|
||||
print(file)
|
||||
# print keys...
|
||||
scan = "S06755"
|
||||
mypath = "/sls/X12SA/data/e20632/Data10/analysis/S06000-06999/" + scan
|
||||
|
||||
# i = 1
|
||||
files = get_files(mypath)
|
||||
# print("files:", files)
|
||||
recon_files = [file for file in files if file.endswith("recons.h5")]
|
||||
|
||||
if recon_files[0]:
|
||||
hf = h5py.File(mypath + '/' + recon_files[0],'r')
|
||||
print(hf.keys())
|
||||
|
||||
# measurement = hf.get('measurement')
|
||||
recon = hf.get('reconstruction')
|
||||
# print(measurement.keys())
|
||||
print(recon.keys())
|
||||
recon_object = hf.get('reconstruction/object')
|
||||
recon_probes = hf.get('reconstruction/probes')
|
||||
print(recon_object)
|
||||
print(recon_probes)
|
||||
|
||||
# import matplotlib.pyplot as plt
|
||||
# plt.imshow(recon_object)
|
||||
|
||||
|
||||
# while True:
|
||||
|
||||
# print("plot_recon")
|
||||
# self.new_plot("recon"+str(i), {'xs':[1*i,2*1,3*1], 'ys':[3*i,4*i,5*i]})
|
||||
# time.sleep(10)
|
||||
# i+=1
|
||||
|
||||
|
Reference in New Issue
Block a user