feat: add pilatus 2 converter to repository
This commit is contained in:
76
bin/cbf_converter/cbf_converter.py
Normal file
76
bin/cbf_converter/cbf_converter.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector, bec_logger
|
||||
from bec_lib.core.redis_connector import MessageObject
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class PilatusConverter:
|
||||
def __init__(self, host: str, port: int) -> None:
|
||||
self._connector = RedisConnector(f"{host}:{port}")
|
||||
self._producer = self._connector.producer()
|
||||
|
||||
def start(self) -> None:
|
||||
"""start the consumer"""
|
||||
self.start_consumer()
|
||||
|
||||
@staticmethod
|
||||
def on_new_message(message: MessageObject, *, parent: PilatusConverter) -> None:
|
||||
"""
|
||||
Callback function for new file messages.
|
||||
|
||||
Args:
|
||||
message (MessageObject): Message object
|
||||
parent (PilatusConverter): Parent object
|
||||
"""
|
||||
msg = BECMessage.MessageReader.loads(message.value)
|
||||
print(msg)
|
||||
if not msg:
|
||||
return
|
||||
parent.process_msg(msg)
|
||||
|
||||
def process_msg(self, message: BECMessage.FileMessage) -> None:
|
||||
"""
|
||||
Callback function for new file messages.
|
||||
"""
|
||||
try:
|
||||
file_path = message.content["file_path"]
|
||||
input_path = message.metadata["input_path"]
|
||||
converter_path = os.path.expanduser(
|
||||
"~/Data10/cxs_software/base/+beamline/+nexus/private/cbf2hdf5/cbf2hdf5"
|
||||
)
|
||||
subprocess.run(
|
||||
f"{converter_path} -s {input_path} -o {file_path} -p /entry/instrument/pilatus_2",
|
||||
shell=True,
|
||||
check=True,
|
||||
)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.error(f"Failed to process message {message} with error {exc}")
|
||||
return
|
||||
|
||||
def start_consumer(self) -> None:
|
||||
"""
|
||||
Start the consumer.
|
||||
"""
|
||||
file_consumer = self._connector.consumer(
|
||||
MessageEndpoints.file_event("pilatus_2"), cb=self.on_new_message, parent=self
|
||||
)
|
||||
file_consumer.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import threading
|
||||
|
||||
pilatus_converter = PilatusConverter("localhost", 6379)
|
||||
pilatus_converter.start()
|
||||
|
||||
threading.Event().wait()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
4
bin/cbf_converter/launch_cbf_converter.sh
Executable file
4
bin/cbf_converter/launch_cbf_converter.sh
Executable file
@@ -0,0 +1,4 @@
|
||||
source ~/Data10/software/bec/bec_venv/bin/activate
|
||||
module add gcc/9.3.0
|
||||
module add hdf5_serial/1.12.2
|
||||
python ~/Data10/software/cbf_converter/cbf_converter.py
|
||||
Reference in New Issue
Block a user