refactor: replace deprecated consumer/producer with connector #77
@@ -13,7 +13,6 @@ logger = bec_logger.logger
|
||||
class PilatusConverter:
|
||||
def __init__(self, host: str, port: int) -> None:
|
||||
self._connector = RedisConnector(f"{host}:{port}")
|
||||
self._producer = self._connector.producer()
|
||||
|
||||
def start(self) -> None:
|
||||
"""start the consumer"""
|
||||
@@ -57,10 +56,9 @@ class PilatusConverter:
|
||||
"""
|
||||
Start the consumer.
|
||||
"""
|
||||
file_consumer = self._connector.consumer(
|
||||
self._connector.register(
|
||||
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
|
||||
)
|
||||
file_consumer.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -116,13 +116,12 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
shorten the movement time. In order to keep the last state, even if the
|
||||
server is restarted, the state is stored in a global variable in redis.
|
||||
"""
|
||||
producer = self.device_manager.producer
|
||||
msg = producer.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
|
||||
msg = self.connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory"))
|
||||
if msg:
|
||||
val = msg.content.get("value", False)
|
||||
else:
|
||||
val = False
|
||||
producer.set(
|
||||
self.connector.set(
|
||||
MessageEndpoints.global_vars("reverse_flomni_trajectory"),
|
||||
messages.VariableMessage(value=(not val)),
|
||||
)
|
||||
@@ -280,7 +279,7 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
yield from self.stubs.kickoff(device="rtx")
|
||||
while True:
|
||||
yield from self.stubs.read(group="monitored")
|
||||
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
|
||||
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
|
||||
if status:
|
||||
status_id = status.content.get("status", 1)
|
||||
request_id = status.metadata.get("RID")
|
||||
|
||||
@@ -116,13 +116,12 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
shorten the movement time. In order to keep the last state, even if the
|
||||
server is restarted, the state is stored in a global variable in redis.
|
||||
"""
|
||||
producer = self.device_manager.producer
|
||||
msg = producer.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
|
||||
msg = self.connector.get(MessageEndpoints.global_vars("reverse_omny_trajectory"))
|
||||
if msg:
|
||||
val = msg.content.get("value", False)
|
||||
else:
|
||||
val = False
|
||||
producer.set(
|
||||
self.connector.set(
|
||||
MessageEndpoints.global_vars("reverse_omny_trajectory"),
|
||||
messages.VariableMessage(value=(not val)),
|
||||
)
|
||||
@@ -265,7 +264,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
yield from self.stubs.kickoff(device="rtx")
|
||||
while True:
|
||||
yield from self.stubs.read(group="monitored")
|
||||
status = self.device_manager.producer.get(MessageEndpoints.device_status("rt_scan"))
|
||||
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
|
||||
if status:
|
||||
status_id = status.content.get("status", 1)
|
||||
request_id = status.metadata.get("RID")
|
||||
|
||||
Reference in New Issue
Block a user