Files
csaxs_bec/bin/cbf_converter/cbf_converter.py

71 lines
2.0 KiB
Python

from __future__ import annotations
import os
import subprocess
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import MessageObject, RedisConnector
logger = bec_logger.logger
class PilatusConverter:
def __init__(self, host: str, port: int) -> None:
self._connector = RedisConnector(f"{host}:{port}")
def start(self) -> None:
"""start the consumer"""
self.start_consumer()
@staticmethod
def on_new_message(message: MessageObject, *, parent: PilatusConverter) -> None:
"""
Callback function for new file messages.
Args:
message (MessageObject): Message object
parent (PilatusConverter): Parent object
"""
msg = message.value
print(msg)
if not msg:
return
parent.process_msg(msg)
def process_msg(self, message: messages.FileMessage) -> None:
"""
Callback function for new file messages.
"""
try:
file_path = message.content["file_path"]
input_path = message.metadata["input_path"]
converter_path = os.path.expanduser(
"~/Data10/cxs_software/base/+beamline/+nexus/private/cbf2hdf5/cbf2hdf5"
)
subprocess.run(
f"{converter_path} -s {input_path} -o {file_path} -p /entry/instrument/pilatus_2",
shell=True,
check=True,
)
except Exception as exc: # pylint: disable=broad-except
logger.error(f"Failed to process message {message} with error {exc}")
return
def start_consumer(self) -> None:
"""
Start the consumer.
"""
self._connector.register(
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
)
if __name__ == "__main__":
import threading
pilatus_converter = PilatusConverter("localhost", 6379)
pilatus_converter.start()
threading.Event().wait()