mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-14 03:31:50 +02:00
fix: producer->connector, remove unused '_rpc_update_handler' static method
This commit is contained in:
@ -2,11 +2,12 @@ import importlib
|
|||||||
import select
|
import select
|
||||||
import subprocess
|
import subprocess
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
from bec_lib import MessageEndpoints, messages
|
|
||||||
|
|
||||||
import bec_widgets.cli.client as client
|
import bec_widgets.cli.client as client
|
||||||
|
from bec_lib import MessageEndpoints, messages
|
||||||
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
from bec_widgets.utils.bec_dispatcher import BECDispatcher
|
||||||
|
|
||||||
|
|
||||||
@ -104,7 +105,7 @@ class RPCBase:
|
|||||||
"""
|
"""
|
||||||
parent = self
|
parent = self
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
while not parent._parent is None:
|
while parent._parent is not None:
|
||||||
parent = parent._parent
|
parent = parent._parent
|
||||||
return parent
|
return parent
|
||||||
|
|
||||||
@ -130,7 +131,7 @@ class RPCBase:
|
|||||||
print(f"RPCBase: {rpc_msg}")
|
print(f"RPCBase: {rpc_msg}")
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
receiver = self._root._gui_id
|
receiver = self._root._gui_id
|
||||||
self._client.producer.send(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
self._client.connector.send(MessageEndpoints.gui_instructions(receiver), rpc_msg)
|
||||||
|
|
||||||
if not wait_for_rpc_response:
|
if not wait_for_rpc_response:
|
||||||
return None
|
return None
|
||||||
@ -166,7 +167,7 @@ class RPCBase:
|
|||||||
"""
|
"""
|
||||||
response = None
|
response = None
|
||||||
while response is None:
|
while response is None:
|
||||||
response = self._client.producer.get(
|
response = self._client.connector.get(
|
||||||
MessageEndpoints.gui_instruction_response(request_id)
|
MessageEndpoints.gui_instruction_response(request_id)
|
||||||
)
|
)
|
||||||
return response
|
return response
|
||||||
|
@ -27,25 +27,22 @@ class BECWidgetsCLIServer:
|
|||||||
"""Start the figure window."""
|
"""Start the figure window."""
|
||||||
self.fig.start()
|
self.fig.start()
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _rpc_update_handler(msg, parent):
|
|
||||||
parent.on_rpc_update(msg.value)
|
|
||||||
|
|
||||||
def on_rpc_update(self, msg: dict, metadata: dict):
|
def on_rpc_update(self, msg: dict, metadata: dict):
|
||||||
|
request_id = metadata.get("request_id")
|
||||||
try:
|
try:
|
||||||
method = msg["action"]
|
method = msg["action"]
|
||||||
args = msg["parameter"].get("args", [])
|
args = msg["parameter"].get("args", [])
|
||||||
kwargs = msg["parameter"].get("kwargs", {})
|
kwargs = msg["parameter"].get("kwargs", {})
|
||||||
request_id = metadata.get("request_id")
|
|
||||||
obj = self.get_object_from_config(msg["parameter"])
|
obj = self.get_object_from_config(msg["parameter"])
|
||||||
res = self.run_rpc(obj, method, args, kwargs)
|
res = self.run_rpc(obj, method, args, kwargs)
|
||||||
self.send_response(request_id, True, {"result": res})
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
self.send_response(request_id, False, {"error": str(e)})
|
self.send_response(request_id, False, {"error": str(e)})
|
||||||
|
else:
|
||||||
|
self.send_response(request_id, True, {"result": res})
|
||||||
|
|
||||||
def send_response(self, request_id: str, accepted: bool, msg: dict):
|
def send_response(self, request_id: str, accepted: bool, msg: dict):
|
||||||
self.client.producer.set(
|
self.client.connector.set(
|
||||||
MessageEndpoints.gui_instruction_response(request_id),
|
MessageEndpoints.gui_instruction_response(request_id),
|
||||||
messages.RequestResponseMessage(accepted=accepted, message=msg),
|
messages.RequestResponseMessage(accepted=accepted, message=msg),
|
||||||
expire=60,
|
expire=60,
|
||||||
|
@ -2,7 +2,6 @@ from bec_lib import messages, MessageEndpoints, RedisConnector
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
connector = RedisConnector("localhost:6379")
|
connector = RedisConnector("localhost:6379")
|
||||||
producer = connector.producer()
|
|
||||||
metadata = {}
|
metadata = {}
|
||||||
|
|
||||||
scanID = "ScanID1"
|
scanID = "ScanID1"
|
||||||
@ -20,9 +19,7 @@ for ii in range(20):
|
|||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
).dumps()
|
).dumps()
|
||||||
|
|
||||||
# producer.send(topic=MessageEndpoints.device_status(device="mca"), msg=msg)
|
connector.xadd(
|
||||||
|
|
||||||
producer.xadd(
|
|
||||||
topic=MessageEndpoints.device_async_readback(
|
topic=MessageEndpoints.device_async_readback(
|
||||||
scanID=scanID, device="mca"
|
scanID=scanID, device="mca"
|
||||||
), # scanID will be different for each scan
|
), # scanID will be different for each scan
|
||||||
|
@ -40,7 +40,7 @@ class StreamPlot(QtWidgets.QWidget):
|
|||||||
uic.loadUi(os.path.join(current_path, "line_plot.ui"), self)
|
uic.loadUi(os.path.join(current_path, "line_plot.ui"), self)
|
||||||
|
|
||||||
self._idle_time = 100
|
self._idle_time = 100
|
||||||
self.producer = RedisConnector(["localhost:6379"]).producer()
|
self.connector = RedisConnector(["localhost:6379"])
|
||||||
|
|
||||||
self.y_value_list = y_value_list
|
self.y_value_list = y_value_list
|
||||||
self.previous_y_value_list = None
|
self.previous_y_value_list = None
|
||||||
@ -214,7 +214,7 @@ class StreamPlot(QtWidgets.QWidget):
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
msg = messages.DeviceMessage(signals=return_dict).dumps()
|
msg = messages.DeviceMessage(signals=return_dict).dumps()
|
||||||
self.producer.set_and_publish("px_stream/gui_event", msg=msg)
|
self.connector.set_and_publish("px_stream/gui_event", msg=msg)
|
||||||
self.roi_signal.emit(region)
|
self.roi_signal.emit(region)
|
||||||
|
|
||||||
def init_table(self):
|
def init_table(self):
|
||||||
@ -270,7 +270,7 @@ class StreamPlot(QtWidgets.QWidget):
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
endpoint = f"px_stream/projection_{self._current_proj}/data"
|
endpoint = f"px_stream/projection_{self._current_proj}/data"
|
||||||
msgs = self.client.producer.lrange(topic=endpoint, start=-1, end=-1)
|
msgs = self.client.connector.lrange(topic=endpoint, start=-1, end=-1)
|
||||||
data = msgs
|
data = msgs
|
||||||
if not data:
|
if not data:
|
||||||
continue
|
continue
|
||||||
@ -295,7 +295,7 @@ class StreamPlot(QtWidgets.QWidget):
|
|||||||
def new_proj(self, content: dict, _metadata: dict):
|
def new_proj(self, content: dict, _metadata: dict):
|
||||||
proj_nr = content["signals"]["proj_nr"]
|
proj_nr = content["signals"]["proj_nr"]
|
||||||
endpoint = f"px_stream/projection_{proj_nr}/metadata"
|
endpoint = f"px_stream/projection_{proj_nr}/metadata"
|
||||||
msg_raw = self.client.producer.get(topic=endpoint)
|
msg_raw = self.client.connector.get(topic=endpoint)
|
||||||
msg = messages.DeviceMessage.loads(msg_raw)
|
msg = messages.DeviceMessage.loads(msg_raw)
|
||||||
self._current_q = msg.content["signals"]["q"]
|
self._current_q = msg.content["signals"]["q"]
|
||||||
self._current_norm = msg.content["signals"]["norm_sum"]
|
self._current_norm = msg.content["signals"]["norm_sum"]
|
||||||
|
Reference in New Issue
Block a user