diff --git a/file_writer/file_writer/__init__.py b/file_writer/file_writer/__init__.py new file mode 100644 index 00000000..0b62c169 --- /dev/null +++ b/file_writer/file_writer/__init__.py @@ -0,0 +1 @@ +from .file_writer_manager import FileWriterManager diff --git a/file_writer/file_writer/file_writer.py b/file_writer/file_writer/file_writer.py new file mode 100644 index 00000000..b7190db2 --- /dev/null +++ b/file_writer/file_writer/file_writer.py @@ -0,0 +1,32 @@ +import abc + +import h5py + + +class FileWriter(abc.ABC): + def configure(self, *args, **kwargs): + ... + + @abc.abstractmethod + def write(self, file_path: str, data): + ... + + +class NeXusFileWriter(FileWriter): + def write(self, file_path: str, data): + file = h5py.File(file_path, "w") + entry = file.create_group("entry") + entry.attrs["NX_class"] = "NXentry" + entry.create_dataset("definition", "NXsas") + instrument = entry.create_group("instrument") + + device_storage = dict() + for point in range(data.num_points): + for dev in data.scan_segments[point]: + if dev not in device_storage: + device_storage[dev] = [data.scan_segments[point][dev][dev]["value"]] + device_storage[dev].append(data.scan_segments[point][dev][dev]["value"]) + for dev in device_storage: + instrument.create_dataset(name=dev, data=device_storage[dev]) + file.close() + print(f"writing file to {file_path}") diff --git a/file_writer/file_writer/file_writer_manager.py b/file_writer/file_writer/file_writer_manager.py new file mode 100644 index 00000000..e8486e49 --- /dev/null +++ b/file_writer/file_writer/file_writer_manager.py @@ -0,0 +1,102 @@ +import os + +from bluekafka_utils import ( + DeviceManagerBase, + KafkaMessage, + MessageEndpoints, + RedisConnector, +) +from bluekafka_utils.connector import ConnectorBase + +from file_writer.file_writer import NeXusFileWriter + + +class ScanStorage: + def __init__(self, scan_number: str, scanID: str) -> None: + self.scan_number = scan_number + self.scanID = scanID + self.scan_segments = dict() + self.scan_finished = False + self.num_points = None + + def append(self, pointID, data): + self.scan_segments[pointID] = data + + def ready_to_write(self) -> bool: + return self.scan_finished and (self.num_points + 1 == len(self.scan_segments)) + + +class FileWriterManager: + def __init__(self, bootstrap, Connector: ConnectorBase, scibec_url: str) -> None: + self.connector = Connector(bootstrap) + self.DM = DeviceManagerBase(self.connector, scibec_url) + self.DM.initialize(bootstrap) + self.producer = self.connector.producer() + self._start_scan_segment_consumer() + self._start_scan_status_consumer() + self.scan_storage = dict() + self.base_path = "./" # should be configured + self.file_writer = NeXusFileWriter() + + def _start_scan_segment_consumer(self): + self._scan_segment_consumer = self.connector.consumer( + pattern=MessageEndpoints.scan_segment(), + cb=self._scan_segment_callback, + parent=self, + ) + self._scan_segment_consumer.start() + + def _start_scan_status_consumer(self): + self._scan_status_consumer = self.connector.consumer( + MessageEndpoints.scan_status(), + cb=self._scan_status_callback, + parent=self, + ) + self._scan_status_consumer.start() + + @staticmethod + def _scan_segment_callback(msg, *, parent): + msg = KafkaMessage.ScanMessage.loads(msg.value) + parent.insert_to_scan_storage(msg) + + @staticmethod + def _scan_status_callback(msg, *, parent): + msg = KafkaMessage.ScanStatusMessage.loads(msg.value) + parent.update_scan_storage_with_status(msg) + + def update_scan_storage_with_status(self, msg: KafkaMessage.ScanStatusMessage): + scanID = msg.content.get("scanID") + if not self.scan_storage.get(scanID): + self.scan_storage[scanID] = ScanStorage( + scan_number=msg.content["info"].get("scan_number"), scanID=scanID + ) + if msg.content.get("status") == "closed": + self.scan_storage[scanID].scan_finished = True + self.scan_storage[scanID].num_points = msg.content["info"]["points"] + self.check_storage_status(scanID=scanID) + + def insert_to_scan_storage(self, msg: KafkaMessage.ScanMessage) -> None: + scanID = msg.content.get("scanID") + if scanID is not None: + if not self.scan_storage.get(scanID): + self.scan_storage[scanID] = ScanStorage( + scan_number=msg.metadata.get("scan_number"), scanID=scanID + ) + self.scan_storage[scanID].append( + pointID=msg.content.get("point_id"), data=msg.content.get("data") + ) + print(msg.content.get("point_id")) + self.check_storage_status(scanID=scanID) + + def check_storage_status(self, scanID: str): + if self.scan_storage[scanID].ready_to_write(): + self.write_file(scanID) + + def write_file(self, scanID: str): + storage = self.scan_storage[scanID] + file_path = os.path.join(self.base_path, f"S{storage.scan_number:05d}.h5") + self.file_writer.write(file_path=file_path, data=storage) + self.scan_storage.pop(scanID) + + def shutdown(self): + self.DM.shutdown()