From 19d117619ad0f4a77efd704732ed1cdb6a93a957 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Wed, 4 Jun 2025 19:38:30 +0200 Subject: [PATCH] refactor: replace deprecated consumer/producer with connector --- bin/cbf_converter/cbf_converter.py | 4 +--- csaxs_bec/scans/flomni_fermat_scan.py | 7 +++---- csaxs_bec/scans/omny_fermat_scan.py | 7 +++---- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/bin/cbf_converter/cbf_converter.py b/bin/cbf_converter/cbf_converter.py index b022858..e2e5a1a 100644 --- a/bin/cbf_converter/cbf_converter.py +++ b/bin/cbf_converter/cbf_converter.py @@ -13,7 +13,6 @@ logger = bec_logger.logger class PilatusConverter: def __init__(self, host: str, port: int) -> None: self._connector = RedisConnector(f"{host}:{port}") - self._producer = self._connector.producer() def start(self) -> None: """start the consumer""" @@ -57,10 +56,9 @@ class PilatusConverter: """ Start the consumer. """ - file_consumer = self._connector.consumer( + self._connector.register( MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self ) - file_consumer.start() if __name__ == "__main__": diff --git a/csaxs_bec/scans/flomni_fermat_scan.py b/csaxs_bec/scans/flomni_fermat_scan.py index 51ccef8..f1cc55c 100644 --- a/csaxs_bec/scans/flomni_fermat_scan.py +++ b/csaxs_bec/scans/flomni_fermat_scan.py @@ -116,13 +116,12 @@ class FlomniFermatScan(SyncFlyScanBase): shorten the movement time. In order to keep the last state, even if the server is restarted, the state is stored in a global variable in redis. """ - producer = self.device_manager.producer - msg = producer.get(MessageEndpoints.global_vars("reverse_flomni_trajectory")) + msg = self.connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory")) if msg: val = msg.content.get("value", False) else: val = False - producer.set( + self.connector.set( MessageEndpoints.global_vars("reverse_flomni_trajectory"), messages.VariableMessage(value=(not val)), ) @@ -280,7 +279,7 @@ class FlomniFermatScan(SyncFlyScanBase): yield from self.stubs.kickoff(device="rtx") while True: yield from self.stubs.read(group="monitored") - status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan")) + status = self.connector.get(MessageEndpoints.device_status("rt_scan")) if status: status_id = status.content.get("status", 1) request_id = status.metadata.get("RID") diff --git a/csaxs_bec/scans/omny_fermat_scan.py b/csaxs_bec/scans/omny_fermat_scan.py index 7a994a6..4529ee8 100644 --- a/csaxs_bec/scans/omny_fermat_scan.py +++ b/csaxs_bec/scans/omny_fermat_scan.py @@ -116,13 +116,12 @@ class OMNYFermatScan(SyncFlyScanBase): shorten the movement time. In order to keep the last state, even if the server is restarted, the state is stored in a global variable in redis. """ - producer = self.device_manager.producer - msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory")) + msg = self.connector.get(MessageEndpoints.global_vars("reverse_omny_trajectory")) if msg: val = msg.content.get("value", False) else: val = False - producer.set( + self.connector.set( MessageEndpoints.global_vars("reverse_omny_trajectory"), messages.VariableMessage(value=(not val)), ) @@ -265,7 +264,7 @@ class OMNYFermatScan(SyncFlyScanBase): yield from self.stubs.kickoff(device="rtx") while True: yield from self.stubs.read(group="monitored") - status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan")) + status = self.connector.get(MessageEndpoints.device_status("rt_scan")) if status: status_id = status.content.get("status", 1) request_id = status.metadata.get("RID") -- 2.49.1