From ccbde45a48a19fb2a7580bc9f332b424ac06e41d Mon Sep 17 00:00:00 2001 From: appel_c Date: Sun, 27 Oct 2024 10:38:42 +0100 Subject: [PATCH] feat: add file_event as new SUB_EVENT to device manager; closes #335 --- bec_lib/bec_lib/messages.py | 30 +++++++++--- bec_lib/tests/test_bec_messages.py | 7 ++- .../device_server/devices/devicemanager.py | 47 +++++++++++++++++++ .../test_device_manager_ds.py | 27 +++++++++++ 4 files changed, 104 insertions(+), 7 deletions(-) diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index aa19a437..b5d6cf4f 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -664,20 +664,38 @@ class FileMessage(BECMessage): file_path (str): Path to the file. done (bool): True if the file writing operation is done. successful (bool): True if the file writing operation was successful. - hinted_locations (dict, optional): Hinted location of important datasets within - the file. Can be used to automatically link a master file with its data files. - Defaults to None. - devices (list, optional): List of devices that are associated with the file. + device_name (str): Name of the device. If is_master_file is True, device_name is optional. + is_master_file (bool, optional): True if the file is a master file. Defaults to False. + file_type (str, optional): Type of the file. Defaults to "h5". + hinted_h5_entries (dict[str, str], optional): Dictionary with hinted h5 entries. Defaults to None. + This allows the file writer to automatically create external links within the master.h5 file + written by BEC under the entry for the specified device. The dictionary should contain the + sub-entries and to where these should link in the external h5 file (file_path). + Example for device_name='eiger', and dict('data' : '/entry/data/data'), the location + '/entry/collection/devices/eiger/data' within the master file will link to '/entry/data/data' + of the external file. metadata (dict, optional): Additional metadata. Defaults to None. """ msg_type: ClassVar[str] = "file_message" + file_path: str done: bool successful: bool - hinted_locations: dict[str, str] | None = None - devices: list[str] | None = None + is_master_file: bool = Field(default=False) + device_name: str | None = Field(default=None) + file_type: str = "h5" + hinted_h5_entries: dict[str, str] | None = None + + @field_validator("is_master_file", mode="after") + @classmethod + def check_is_master_file(cls, v: bool): + """Validate is the FileMessage is for the master file""" + if v is False: + return v + if v is True: + return cls.device_name != None class FileContentMessage(BECMessage): diff --git a/bec_lib/tests/test_bec_messages.py b/bec_lib/tests/test_bec_messages.py index e23a24e9..b7b8fd0f 100644 --- a/bec_lib/tests/test_bec_messages.py +++ b/bec_lib/tests/test_bec_messages.py @@ -211,7 +211,12 @@ def test_StatusMessage(): def test_FileMessage(): msg = messages.FileMessage( - file_path="/path/to/file", done=True, successful=True, metadata={"RID": "1234"} + device_name="samx", + file_path="/path/to/file", + done=True, + successful=True, + hinted_h5_entries={"data": "entry/data"}, + metadata={"RID": "1234"}, ) res = MsgpackSerialization.dumps(msg) res_loaded = MsgpackSerialization.loads(res) diff --git a/bec_server/bec_server/device_server/devices/devicemanager.py b/bec_server/bec_server/device_server/devices/devicemanager.py index e1012447..2fbe38f5 100644 --- a/bec_server/bec_server/device_server/devices/devicemanager.py +++ b/bec_server/bec_server/device_server/devices/devicemanager.py @@ -356,12 +356,15 @@ class DeviceManagerDS(DeviceManagerBase): obj.subscribe(self._obj_callback_device_monitor_2d, run=False) if "device_monitor_1d" in obj.event_types: obj.subscribe(self._obj_callback_device_monitor_1d, run=False) + if "file_event" in obj.event_types: + obj.subscribe(self._obj_callback_file_event, run=False) if "done_moving" in obj.event_types: obj.subscribe(self._obj_callback_done_moving, event_type="done_moving", run=False) if "flyer" in obj.event_types: obj.subscribe(self._obj_flyer_callback, event_type="flyer", run=False) if "progress" in obj.event_types: obj.subscribe(self._obj_progress_callback, event_type="progress", run=False) + if hasattr(obj, "motor_is_moving"): obj.motor_is_moving.subscribe(self._obj_callback_is_moving, run=opaas_obj.enabled) @@ -605,3 +608,47 @@ class DeviceManagerDS(DeviceManagerBase): value=value, max_value=max_value, done=done, metadata=metadata ) self.connector.set_and_publish(MessageEndpoints.device_progress(obj.root.name), msg) + + def _obj_callback_file_event( + self, + *_args, + obj, + file_path: str, + done: bool, + successful: bool, + file_type: str = "h5", + hinted_h5_entries: dict[str, str] | None = None, + **kwargs, + ): + """Callback for file events on devices. This callback set and publishes + a file message to the file_event and public_file endpoints in Redis to inform + the file writer and other services about externally created files. + + Args: + obj (OphydObject): ophyd object + file_path (str): file path to the created file + done (bool): if the file is done + successfull (bool): if the file was created successfully + file_type (str): Optional, file type. Default is h5. + hinted_h5_entry (dict[str, str] | None): Optional, hinted h5 entry. Please check FileMessage for more details + """ + device_name = obj.root.name + metadata = self.devices[device_name].metadata + if kwargs.get("metadata") is not None: + metadata.update(kwargs.get("metadata")) + scan_id = metadata.get("scan_id") + msg = messages.FileMessage( + file_path=file_path, + done=done, + successful=successful, + file_type=file_type, + device_name=device_name, + hinted_h5_entries=hinted_h5_entries, + metadata=metadata, + ) + pipe = self.connector.pipeline() + self.connector.set_and_publish(MessageEndpoints.file_event(device_name), msg, pipe=pipe) + self.connector.set_and_publish( + MessageEndpoints.public_file(scan_id=scan_id, name=device_name), msg, pipe=pipe + ) + pipe.execute() diff --git a/bec_server/tests/tests_device_server/test_device_manager_ds.py b/bec_server/tests/tests_device_server/test_device_manager_ds.py index 222906aa..cf36a0c7 100644 --- a/bec_server/tests/tests_device_server/test_device_manager_ds.py +++ b/bec_server/tests/tests_device_server/test_device_manager_ds.py @@ -202,3 +202,30 @@ def test_device_manager_ds_reset_config(dm_with_devices): mock_connector.lpush.assert_called_once_with( MessageEndpoints.device_config_history(), config_msg, max_size=50 ) + + +@pytest.mark.parametrize("device_manager_class", [DeviceManagerDS]) +def test_obj_callback_file_event(dm_with_devices, connected_connector): + device_manager = dm_with_devices + eiger = device_manager.devices.eiger + eiger.metadata = {"scan_id": "12345"} + # Use here fake redis connector, pipe is used and checks pydantic models + device_manager.connector = connected_connector + device_manager._obj_callback_file_event( + obj=eiger.obj, + file_path="test_file_path", + done=True, + successful=True, + hinted_h5_entries={"my_entry": "entry/data/data"}, + metadata={"user_info": "my_info"}, + ) + msg = connected_connector.get(MessageEndpoints.file_event(name="eiger")) + msg2 = connected_connector.get(MessageEndpoints.public_file(scan_id="12345", name="eiger")) + assert msg == msg2 + assert msg.content["file_path"] == "test_file_path" + assert msg.content["done"] is True + assert msg.content["successful"] is True + assert msg.content["hinted_h5_entries"] == {"my_entry": "entry/data/data"} + assert msg.content["file_type"] == "h5" + assert msg.metadata == {"scan_id": "12345", "user_info": "my_info"} + assert msg.content["is_master_file"] is False