diff --git a/bec_server/bec_server/scihub/scibec/scibec_connector.py b/bec_server/bec_server/scihub/scibec/scibec_connector.py index 064ddaa1..df29ffdd 100644 --- a/bec_server/bec_server/scihub/scibec/scibec_connector.py +++ b/bec_server/bec_server/scihub/scibec/scibec_connector.py @@ -3,6 +3,7 @@ from __future__ import annotations import os from typing import TYPE_CHECKING +import py_scibec from dotenv import dotenv_values from py_scibec import SciBecCore @@ -176,29 +177,24 @@ class SciBecConnector: self.scibec.login(username=self.ingestor, password=self.ingestor_secret) def _update_experiment_info(self): - beamline_info = self.scibec.beamline.beamline_controller_find( - query_params={"filter": {"where": {"name": self.target_bl}}} - ) - if not beamline_info.body: + bl_filter = py_scibec.bec.BeamlineFilterWhere(where={"name": self.target_bl}) + beamline_info = self.scibec.beamline.beamline_controller_find(bl_filter) + if not beamline_info: raise SciBecConnectorError(f"Could not find a beamline with the name {self.target_bl}") - beamline_info = beamline_info.body[0] - self.scibec_info["beamline"] = beamline_info - experiment_id = beamline_info.get("activeExperiment") + self.scibec_info["beamline"] = beamline_info[0] + experiment_id = beamline_info[0].active_experiment if not experiment_id: raise SciBecConnectorError(f"Could not find an active experiment on {self.target_bl}") - experiment = self.scibec.experiment.experiment_controller_find_by_id( - path_params={"id": experiment_id} - ) + experiment = self.scibec.experiment.experiment_controller_find_by_id(experiment_id) if not experiment: raise SciBecConnectorError(f"Could not find an experiment with the id {experiment_id}") - experiment = experiment.body self.scibec_info["activeExperiment"] = experiment def _update_eaccount_in_redis(self): - write_account = self.scibec_info["activeExperiment"]["writeAccount"] + write_account = self.scibec_info["activeExperiment"].write_account if write_account[0] == "p": write_account = write_account.replace("p", "e") msg = messages.VariableMessage(value=write_account) diff --git a/bec_server/bec_server/scihub/scibec/scibec_metadata_handler.py b/bec_server/bec_server/scihub/scibec/scibec_metadata_handler.py index 5aeb65a9..428feb63 100644 --- a/bec_server/bec_server/scihub/scibec/scibec_metadata_handler.py +++ b/bec_server/bec_server/scihub/scibec/scibec_metadata_handler.py @@ -5,6 +5,8 @@ import time from typing import TYPE_CHECKING, Any import numpy as np +import py_scibec +import py_scibec_openapi_client.models as py_scibec_models from bec_lib import MessageEndpoints, bec_logger from bec_lib.serialization import json_ext @@ -67,59 +69,60 @@ class SciBecMetadataHandler: if not scibec: return scibec_info = self.scibec_connector.scibec_info - experiment_id = scibec_info["activeExperiment"]["id"] + experiment_id = scibec_info["activeExperiment"].id # session_id = scibec_info["activeSession"][0]["id"] # experiment_id = scibec_info["activeSession"][0]["experimentId"] logger.debug(f"Received new scan status {msg}") - scan = scibec.scan.scan_controller_find( - query_params={"filter": {"where": {"scanId": msg.content["scan_id"]}}} - ).body + scan_filter = py_scibec.bec.ScanFilterWhere(where={"scanId": msg.content["scan_id"]}) + scan = scibec.scan.scan_controller_find(scan_filter) if not scan: info = msg.content["info"] dataset_number = info.get("dataset_number") - dataset = scibec.dataset.dataset_controller_find( - query_params={ - "filter": {"where": {"number": dataset_number, "experimentId": experiment_id}} - } - ).body + dataset_filter = py_scibec.bec.DatasetFilterWhere( + where={"number": dataset_number, "experimentId": experiment_id} + ) + dataset = scibec.dataset.dataset_controller_find(dataset_filter) if dataset: dataset = dataset[0] else: - dataset = scibec.dataset.dataset_controller_create( - body=scibec.models.Dataset( - **{ - "readACL": scibec_info["activeExperiment"]["readACL"], - "writeACL": scibec_info["activeExperiment"]["readACL"], - "owner": scibec_info["activeExperiment"]["owner"], - "number": dataset_number, - "experimentId": experiment_id, - "name": info.get("dataset_name", ""), - } - ) - ).body + new_dataset = py_scibec.bec.NewDataset( + **{ + "readACL": scibec_info["activeExperiment"].read_acl, + "writeACL": scibec_info["activeExperiment"].read_acl, + "owner": scibec_info["activeExperiment"].owner, + "number": dataset_number, + "experimentId": experiment_id, + "name": info.get("dataset_name", ""), + } + ) + dataset = scibec.dataset.dataset_controller_create(new_dataset) - scan_data = { - "readACL": scibec_info["activeExperiment"]["readACL"], - "writeACL": scibec_info["activeExperiment"]["readACL"], - "owner": scibec_info["activeExperiment"]["owner"], - "scanType": info.get("scan_name", ""), - "scanId": info.get("scan_id", ""), - "queueId": info.get("queue_id", ""), - "requestId": info.get("RID", ""), - "exitStatus": msg.content["status"], - # "queue": info.get("stream", ""), - "metadata": info, - # "sessionId": session_id, - "datasetId": dataset["id"], - "scanNumber": info.get("scan_number", 0), - } - scan = scibec.scan.scan_controller_create(body=scibec.models.Scan(**scan_data)).body + new_scan = py_scibec.bec.NewScan( + **{ + "readACL": scibec_info["activeExperiment"].read_acl, + "writeACL": scibec_info["activeExperiment"].read_acl, + "owner": scibec_info["activeExperiment"].owner, + "scanType": info.get("scan_name", ""), + "scanId": info.get("scan_id", ""), + "queueId": info.get("queue_id", ""), + "requestId": info.get("RID", ""), + "exitStatus": msg.content["status"], + # "queue": info.get("stream", ""), + "metadata": info, + # "sessionId": session_id, + "datasetId": dataset.id, + "scanNumber": info.get("scan_number", 0), + } + ) + scan = scibec.scan.scan_controller_create(new_scan) # scan = scibec.add_scan(scan_data) else: info = msg.content["info"] scan = scibec.scan.scan_controller_update_by_id( - path_params={"id": scan[0]["id"]}, - body={"metadata": info, "exitStatus": msg.content["status"]}, + id=scan[0].id, + scan_partial=py_scibec.bec.ScanPartial( + metadata=info, exitStatus=msg.content["status"] + ), ) return scan @@ -174,9 +177,8 @@ class SciBecMetadataHandler: scibec = self.scibec_connector.scibec if not scibec: return - scan = scibec.scan.scan_controller_find( - query_params={"filter": {"where": {"scanId": data["metadata"]["scan_id"]}}} - ).body + scan_filter = py_scibec.bec.ScanFilterWhere(where={"scanId": data["metadata"]["scan_id"]}) + scan = scibec.scan.scan_controller_find(scan_filter) if not scan: logger.warning( f"Could not find scan with scan_id {data['metadata']['scan_id']}. Cannot write scan" @@ -195,24 +197,24 @@ class SciBecMetadataHandler: ) self._write_scan_data_chunks(file_path, data_bec, scan) else: - - scibec.scan_data.scan_data_controller_create_many( - body=scibec.models.ScanData( - **{ - "readACL": scan["readACL"], - "writeACL": scan["readACL"], - "owner": scan["owner"], - "scanId": scan["id"], - "filePath": file_path, - "data": data_bec, - } - ) + new_scan_data = py_scibec.bec.NewScanData( + **{ + "readACL": scan.read_acl, + "writeACL": scan.read_acl, + "owner": scan.owner, + "scanId": scan.id, + "filePath": file_path, + "data": data_bec, + } ) + scibec.scan_data.scan_data_controller_create_many(new_scan_data) logger.info( f"Wrote scan data to SciBec for scan_id {data['metadata']['scan_id']} in {time.time() - start} seconds." ) - def _write_scan_data_chunks(self, file_path: str, data_bec: dict, scan: dict): + def _write_scan_data_chunks( + self, file_path: str, data_bec: dict, scan: py_scibec_models.ScanWithRelations + ): """ Write the scan data to SciBec in chunks. This method is called if the scan data is larger than 1 MB. The method loops through all keys in the data dictionary and creates chunks of @@ -234,35 +236,33 @@ class SciBecMetadataHandler: ) continue if len(json.dumps(chunk)) + len(json.dumps({key: value})) > self.MAX_DATA_SIZE: - scibec.scan_data.scan_data_controller_create_many( - body=scibec.models.ScanData( - **{ - "readACL": scan["readACL"], - "writeACL": scan["readACL"], - "owner": scan["owner"], - "scanId": scan["id"], - "filePath": file_path, - "data": chunk, - } - ) + new_scan_data = py_scibec.bec.NewScanData( + **{ + "readACL": scan.read_acl, + "writeACL": scan.read_acl, + "owner": scan.owner, + "scanId": scan.id, + "filePath": file_path, + "data": chunk, + } ) + scibec.scan_data.scan_data_controller_create_many(new_scan_data) chunk = {} chunk[key] = value # Write the last chunk if chunk: - scibec.scan_data.scan_data_controller_create_many( - body=scibec.models.ScanData( - **{ - "readACL": scan["readACL"], - "writeACL": scan["readACL"], - "owner": scan["owner"], - "scanId": scan["id"], - "filePath": file_path, - "data": chunk, - } - ) + new_scan_data = py_scibec.bec.NewScanData( + **{ + "readACL": scan.read_acl, + "writeACL": scan.read_acl, + "owner": scan.owner, + "scanId": scan.id, + "filePath": file_path, + "data": chunk, + } ) + scibec.scan_data.scan_data_controller_create_many(new_scan_data) def shutdown(self): """ diff --git a/bec_server/tests/tests_scihub/conftest.py b/bec_server/tests/tests_scihub/conftest.py index 663d9f61..16e02629 100644 --- a/bec_server/tests/tests_scihub/conftest.py +++ b/bec_server/tests/tests_scihub/conftest.py @@ -1,4 +1,8 @@ import pytest +from py_scibec_openapi_client.models.beamline_with_relations import BeamlineWithRelations +from py_scibec_openapi_client.models.dataset_with_relations import DatasetWithRelations +from py_scibec_openapi_client.models.experiment_with_relations import ExperimentWithRelations +from py_scibec_openapi_client.models.scan_with_relations import ScanWithRelations from bec_lib import bec_logger @@ -10,3 +14,36 @@ from bec_lib import bec_logger def threads_check(threads_check): yield bec_logger.logger.remove() + + +@pytest.fixture() +def active_experiment(): + return ExperimentWithRelations( + name="dummy_experiment", + writeAccount="p12345", + id="dummy_experiment_id", + beamlineId="dummy_id", + ) + + +@pytest.fixture() +def beamline_document(): + return BeamlineWithRelations( + name="dummy_bl", activeExperiment="dummy_experiment", id="dummy_id" + ) + + +@pytest.fixture() +def dataset_document(): + return DatasetWithRelations(name="dummy_dataset", id="dummy_id") + + +@pytest.fixture() +def scan_document(): + return ScanWithRelations( + name="dummy_scan", + id="dummy_id", + readACL=["readACL"], + writeACL=["writeACL"], + owner=["owner"], + ) diff --git a/bec_server/tests/tests_scihub/test_scibec_connector.py b/bec_server/tests/tests_scihub/test_scibec_connector.py index adf704b4..00e2d4fa 100644 --- a/bec_server/tests/tests_scihub/test_scibec_connector.py +++ b/bec_server/tests/tests_scihub/test_scibec_connector.py @@ -3,7 +3,6 @@ from unittest import mock import pytest from bec_lib import MessageEndpoints, ServiceConfig, messages -from bec_lib.logger import bec_logger from bec_lib.messages import BECStatus from bec_lib.tests.utils import ConnectorMock from bec_server.scihub import SciHub @@ -114,36 +113,18 @@ def test_scibec_connect_to_scibec(SciBecMock): mock_update_eaccount_in_redis.assert_called_once() -class ServerResponseMock: - def __init__(self, body): - self.body = body - - -def test_scibec_update_experiment_info(SciBecMock): +def test_scibec_update_experiment_info(SciBecMock, active_experiment, beamline_document): with mock.patch.object(SciBecMock, "scibec") as mock_scibec: - beamline_document = { - "name": "dummy_bl", - "activeExperiment": "dummy_experiment", - "id": "dummy_id", - } - mock_scibec.beamline.beamline_controller_find.return_value = ServerResponseMock( - (beamline_document,) - ) - experiment_document = { - "name": "dummy_experiment", - "writeAccount": "dummy_write_account", - "id": "dummy_experiment_id", - } - mock_scibec.experiment.experiment_controller_find_by_id.return_value = ServerResponseMock( - experiment_document - ) + mock_scibec.beamline.beamline_controller_find.return_value = (beamline_document,) + experiment_document = active_experiment + mock_scibec.experiment.experiment_controller_find_by_id.return_value = experiment_document SciBecMock._update_experiment_info() assert SciBecMock.scibec_info["activeExperiment"] == experiment_document assert SciBecMock.scibec_info["beamline"] == beamline_document -def test_update_eaccount_in_redis(SciBecMock): - SciBecMock.scibec_info = {"activeExperiment": {"writeAccount": "p12345"}} +def test_update_eaccount_in_redis(SciBecMock, active_experiment): + SciBecMock.scibec_info = {"activeExperiment": active_experiment} with mock.patch.object(SciBecMock, "connector") as mock_connector: SciBecMock._update_eaccount_in_redis() mock_connector.set.assert_called_once_with( diff --git a/bec_server/tests/tests_scihub/test_scibec_metadata_handler.py b/bec_server/tests/tests_scihub/test_scibec_metadata_handler.py index 947d6373..52061a8a 100644 --- a/bec_server/tests/tests_scihub/test_scibec_metadata_handler.py +++ b/bec_server/tests/tests_scihub/test_scibec_metadata_handler.py @@ -2,6 +2,7 @@ from unittest import mock import numpy as np import pytest +from py_scibec_openapi_client.models import NewScanData, ScanPartial from bec_lib import messages from bec_lib.redis_connector import MessageObject @@ -46,48 +47,33 @@ def test_update_scan_status_returns_without_scibec(md_handler): md_handler.update_scan_status(msg) -def test_update_scan_status(md_handler): +def test_update_scan_status(md_handler, active_experiment, dataset_document): # pylint: disable=protected-access msg = messages.ScanStatusMessage(scan_id="scan_id", status="open", info={"dataset_number": 12}) scibec = mock.Mock() md_handler.scibec_connector.scibec = scibec - scibec_info = { - "activeExperiment": { - "id": "id", - "readACL": ["readACL"], - "writeACL": ["writeACL"], - "owner": "owner", - } - } + scibec_info = {"activeExperiment": active_experiment} md_handler.scibec_connector.scibec_info = scibec_info - type(scibec.scan.scan_controller_find()).body = mock.PropertyMock(return_value=[]) - type(scibec.dataset.dataset_controller_find()).body = mock.PropertyMock(return_value=[]) - type(scibec.dataset.dataset_controller_create()).body = mock.PropertyMock( - return_value={"id": "id"} - ) + scibec.scan.scan_controller_find = mock.MagicMock(return_value=[]) + scibec.dataset.dataset_controller_find = mock.MagicMock(return_value=[]) + scibec.dataset.dataset_controller_create = mock.MagicMock(return_value=dataset_document) md_handler.update_scan_status(msg) -def test_update_scan_status_patch(md_handler): +def test_update_scan_status_patch(md_handler, active_experiment, scan_document): # pylint: disable=protected-access msg = messages.ScanStatusMessage( scan_id="scan_id", status="closed", info={"dataset_number": 12} ) scibec = mock.Mock() md_handler.scibec_connector.scibec = scibec - scibec_info = { - "activeExperiment": { - "id": "id", - "readACL": ["readACL"], - "writeACL": ["writeACL"], - "owner": "owner", - } - } + scibec_info = {"activeExperiment": active_experiment} md_handler.scibec_connector.scibec_info = scibec_info - type(scibec.scan.scan_controller_find()).body = mock.PropertyMock(return_value=[{"id": "id"}]) + scibec.scan.scan_controller_find = mock.MagicMock(return_value=[scan_document]) md_handler.update_scan_status(msg) scibec.scan.scan_controller_update_by_id.assert_called_once_with( - path_params={"id": "id"}, body={"metadata": {"dataset_number": 12}, "exitStatus": "closed"} + id="dummy_id", + scan_partial=ScanPartial(exit_status="closed", metadata={"dataset_number": 12}), ) @@ -124,31 +110,27 @@ def test_update_scan_data_without_scan(md_handler): # pylint: disable=protected-access scibec = mock.Mock() md_handler.scibec_connector.scibec = scibec - type(scibec.scan.scan_controller_find()).body = mock.PropertyMock(return_value=[]) + scibec.scan.scan_controller_find = mock.MagicMock(return_value=[]) md_handler.update_scan_data( file_path="my_file.h5", data={"data": {}, "metadata": {"scan_id": "scan_id"}} ) -def test_update_scan_data(md_handler): +def test_update_scan_data(md_handler, scan_document): # pylint: disable=protected-access scibec = mock.Mock() md_handler.scibec_connector.scibec = scibec - type(scibec.scan.scan_controller_find()).body = mock.PropertyMock( - return_value=[ - {"id": "id", "readACL": ["readACL"], "writeACL": ["writeACL"], "owner": "owner"} - ] - ) + scibec.scan.scan_controller_find = mock.MagicMock(return_value=[scan_document]) md_handler.update_scan_data( file_path="my_file.h5", data={"data": {}, "metadata": {"scan_id": "scan_id"}} ) scibec.scan_data.scan_data_controller_create_many.assert_called_once_with( - body=scibec.models.ScanData( + NewScanData( **{ - "readACL": "readACL", - "writeACL": "readACL", - "owner": "owner", - "scanId": "id", + "readACL": ["readACL"], + "writeACL": ["readACL"], + "owner": ["owner"], + "scanId": "dummy_id", "filePath": "my_file.h5", "data": {"data": {}, "metadata": {"scan_id": "scan_id"}}, } @@ -156,16 +138,12 @@ def test_update_scan_data(md_handler): ) -def test_update_scan_data_exceeding_limit(md_handler): +def test_update_scan_data_exceeding_limit(md_handler, scan_document): # pylint: disable=protected-access scibec = mock.Mock() md_handler.MAX_DATA_SIZE = 1000 md_handler.scibec_connector.scibec = scibec - type(scibec.scan.scan_controller_find()).body = mock.PropertyMock( - return_value=[ - {"id": "id", "readACL": ["readACL"], "writeACL": ["writeACL"], "owner": "owner"} - ] - ) + scibec.scan.scan_controller_find = mock.MagicMock(return_value=[scan_document]) data_block = {f"key_{i}": {"signal": list(range(100))} for i in range(10)} data_block.update({"metadata": {"scan_id": "scan_id"}}) md_handler.update_scan_data(file_path="my_file.h5", data=data_block)