diff --git a/bec_widgets/cli/client_utils.py b/bec_widgets/cli/client_utils.py index 49c51732..c7be0abb 100644 --- a/bec_widgets/cli/client_utils.py +++ b/bec_widgets/cli/client_utils.py @@ -2,11 +2,12 @@ import importlib import select import subprocess import uuid + from functools import wraps -from bec_lib import MessageEndpoints, messages import bec_widgets.cli.client as client +from bec_lib import MessageEndpoints, messages from bec_widgets.utils.bec_dispatcher import BECDispatcher @@ -104,7 +105,7 @@ class RPCBase: """ parent = self # pylint: disable=protected-access - while not parent._parent is None: + while parent._parent is not None: parent = parent._parent return parent @@ -130,7 +131,7 @@ class RPCBase: print(f"RPCBase: {rpc_msg}") # pylint: disable=protected-access receiver = self._root._gui_id - self._client.producer.send(MessageEndpoints.gui_instructions(receiver), rpc_msg) + self._client.connector.send(MessageEndpoints.gui_instructions(receiver), rpc_msg) if not wait_for_rpc_response: return None @@ -166,7 +167,7 @@ class RPCBase: """ response = None while response is None: - response = self._client.producer.get( + response = self._client.connector.get( MessageEndpoints.gui_instruction_response(request_id) ) return response diff --git a/bec_widgets/cli/server.py b/bec_widgets/cli/server.py index 5081bee6..72619ba1 100644 --- a/bec_widgets/cli/server.py +++ b/bec_widgets/cli/server.py @@ -27,25 +27,22 @@ class BECWidgetsCLIServer: """Start the figure window.""" self.fig.start() - @staticmethod - def _rpc_update_handler(msg, parent): - parent.on_rpc_update(msg.value) - def on_rpc_update(self, msg: dict, metadata: dict): + request_id = metadata.get("request_id") try: method = msg["action"] args = msg["parameter"].get("args", []) kwargs = msg["parameter"].get("kwargs", {}) - request_id = metadata.get("request_id") obj = self.get_object_from_config(msg["parameter"]) res = self.run_rpc(obj, method, args, kwargs) - self.send_response(request_id, True, {"result": res}) except Exception as e: print(e) self.send_response(request_id, False, {"error": str(e)}) + else: + self.send_response(request_id, True, {"result": res}) def send_response(self, request_id: str, accepted: bool, msg: dict): - self.client.producer.set( + self.client.connector.set( MessageEndpoints.gui_instruction_response(request_id), messages.RequestResponseMessage(accepted=accepted, message=msg), expire=60, diff --git a/bec_widgets/examples/mca_readout/mca_sim.py b/bec_widgets/examples/mca_readout/mca_sim.py index 2bb23e06..d1c5d695 100644 --- a/bec_widgets/examples/mca_readout/mca_sim.py +++ b/bec_widgets/examples/mca_readout/mca_sim.py @@ -2,7 +2,6 @@ from bec_lib import messages, MessageEndpoints, RedisConnector import time connector = RedisConnector("localhost:6379") -producer = connector.producer() metadata = {} scanID = "ScanID1" @@ -20,9 +19,7 @@ for ii in range(20): metadata=metadata, ).dumps() - # producer.send(topic=MessageEndpoints.device_status(device="mca"), msg=msg) - - producer.xadd( + connector.xadd( topic=MessageEndpoints.device_async_readback( scanID=scanID, device="mca" ), # scanID will be different for each scan diff --git a/bec_widgets/examples/stream_plot/stream_plot.py b/bec_widgets/examples/stream_plot/stream_plot.py index 274ceb2b..c6b3d7d2 100644 --- a/bec_widgets/examples/stream_plot/stream_plot.py +++ b/bec_widgets/examples/stream_plot/stream_plot.py @@ -40,7 +40,7 @@ class StreamPlot(QtWidgets.QWidget): uic.loadUi(os.path.join(current_path, "line_plot.ui"), self) self._idle_time = 100 - self.producer = RedisConnector(["localhost:6379"]).producer() + self.connector = RedisConnector(["localhost:6379"]) self.y_value_list = y_value_list self.previous_y_value_list = None @@ -214,7 +214,7 @@ class StreamPlot(QtWidgets.QWidget): ] } msg = messages.DeviceMessage(signals=return_dict).dumps() - self.producer.set_and_publish("px_stream/gui_event", msg=msg) + self.connector.set_and_publish("px_stream/gui_event", msg=msg) self.roi_signal.emit(region) def init_table(self): @@ -270,7 +270,7 @@ class StreamPlot(QtWidgets.QWidget): time.sleep(0.1) continue endpoint = f"px_stream/projection_{self._current_proj}/data" - msgs = self.client.producer.lrange(topic=endpoint, start=-1, end=-1) + msgs = self.client.connector.lrange(topic=endpoint, start=-1, end=-1) data = msgs if not data: continue @@ -295,7 +295,7 @@ class StreamPlot(QtWidgets.QWidget): def new_proj(self, content: dict, _metadata: dict): proj_nr = content["signals"]["proj_nr"] endpoint = f"px_stream/projection_{proj_nr}/metadata" - msg_raw = self.client.producer.get(topic=endpoint) + msg_raw = self.client.connector.get(topic=endpoint) msg = messages.DeviceMessage.loads(msg_raw) self._current_q = msg.content["signals"]["q"] self._current_norm = msg.content["signals"]["norm_sum"]