refactor: replace deprecated consumer/producer with connector #77

Merged
wakonig_k merged 1 commits from refactor/deprecated_producer into main 2025-06-05 15:21:18 +02:00
3 changed files with 7 additions and 11 deletions

View File

@@ -13,7 +13,6 @@ logger = bec_logger.logger
class PilatusConverter:
def __init__(self, host: str, port: int) -> None:
self._connector = RedisConnector(f"{host}:{port}")
self._producer = self._connector.producer()
def start(self) -> None:
"""start the consumer"""
@@ -57,10 +56,9 @@ class PilatusConverter:
"""
Start the consumer.
"""
file_consumer = self._connector.consumer(
self._connector.register(
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
)
file_consumer.start()
if __name__ == "__main__":

View File

@@ -116,13 +116,12 @@ class FlomniFermatScan(SyncFlyScanBase):
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
msg = self.connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
producer.set(
self.connector.set(
MessageEndpoints.global_vars("reverse_flomni_trajectory"),
messages.VariableMessage(value=(not val)),
)
@@ -280,7 +279,7 @@ class FlomniFermatScan(SyncFlyScanBase):
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")

View File

@@ -116,13 +116,12 @@ class OMNYFermatScan(SyncFlyScanBase):
shorten the movement time. In order to keep the last state, even if the
server is restarted, the state is stored in a global variable in redis.
"""
producer = self.device_manager.producer
msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
msg = self.connector.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
if msg:
val = msg.content.get("value", False)
else:
val = False
producer.set(
self.connector.set(
MessageEndpoints.global_vars("reverse_omny_trajectory"),
messages.VariableMessage(value=(not val)),
)
@@ -265,7 +264,7 @@ class OMNYFermatScan(SyncFlyScanBase):
yield from self.stubs.kickoff(device="rtx")
while True:
yield from self.stubs.read(group="monitored")
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
if status:
status_id = status.content.get("status", 1)
request_id = status.metadata.get("RID")