71 lines
2.0 KiB
Python
71 lines
2.0 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import subprocess
|
|
|
|
from bec_lib import bec_logger, messages
|
|
from bec_lib.endpoints import MessageEndpoints
|
|
from bec_lib.redis_connector import MessageObject, RedisConnector
|
|
|
|
logger = bec_logger.logger
|
|
|
|
|
|
class PilatusConverter:
|
|
def __init__(self, host: str, port: int) -> None:
|
|
self._connector = RedisConnector(f"{host}:{port}")
|
|
|
|
def start(self) -> None:
|
|
"""start the consumer"""
|
|
self.start_consumer()
|
|
|
|
@staticmethod
|
|
def on_new_message(message: MessageObject, *, parent: PilatusConverter) -> None:
|
|
"""
|
|
Callback function for new file messages.
|
|
|
|
Args:
|
|
message (MessageObject): Message object
|
|
parent (PilatusConverter): Parent object
|
|
"""
|
|
msg = message.value
|
|
print(msg)
|
|
if not msg:
|
|
return
|
|
parent.process_msg(msg)
|
|
|
|
def process_msg(self, message: messages.FileMessage) -> None:
|
|
"""
|
|
Callback function for new file messages.
|
|
"""
|
|
try:
|
|
file_path = message.content["file_path"]
|
|
input_path = message.metadata["input_path"]
|
|
converter_path = os.path.expanduser(
|
|
"~/Data10/cxs_software/base/+beamline/+nexus/private/cbf2hdf5/cbf2hdf5"
|
|
)
|
|
subprocess.run(
|
|
f"{converter_path} -s {input_path} -o {file_path} -p /entry/instrument/pilatus_2",
|
|
shell=True,
|
|
check=True,
|
|
)
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
logger.error(f"Failed to process message {message} with error {exc}")
|
|
return
|
|
|
|
def start_consumer(self) -> None:
|
|
"""
|
|
Start the consumer.
|
|
"""
|
|
self._connector.register(
|
|
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import threading
|
|
|
|
pilatus_converter = PilatusConverter("localhost", 6379)
|
|
pilatus_converter.start()
|
|
|
|
threading.Event().wait()
|