fix(scihub): fixed scibec connector for new api

This commit is contained in:
wakonig_k 2024-05-02 16:08:35 +02:00
parent f1812e558b
commit fc94c827e4
5 changed files with 149 additions and 157 deletions

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import os import os
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import py_scibec
from dotenv import dotenv_values from dotenv import dotenv_values
from py_scibec import SciBecCore from py_scibec import SciBecCore
@ -176,29 +177,24 @@ class SciBecConnector:
self.scibec.login(username=self.ingestor, password=self.ingestor_secret) self.scibec.login(username=self.ingestor, password=self.ingestor_secret)
def _update_experiment_info(self): def _update_experiment_info(self):
beamline_info = self.scibec.beamline.beamline_controller_find( bl_filter = py_scibec.bec.BeamlineFilterWhere(where={"name": self.target_bl})
query_params={"filter": {"where": {"name": self.target_bl}}} beamline_info = self.scibec.beamline.beamline_controller_find(bl_filter)
) if not beamline_info:
if not beamline_info.body:
raise SciBecConnectorError(f"Could not find a beamline with the name {self.target_bl}") raise SciBecConnectorError(f"Could not find a beamline with the name {self.target_bl}")
beamline_info = beamline_info.body[0] self.scibec_info["beamline"] = beamline_info[0]
self.scibec_info["beamline"] = beamline_info experiment_id = beamline_info[0].active_experiment
experiment_id = beamline_info.get("activeExperiment")
if not experiment_id: if not experiment_id:
raise SciBecConnectorError(f"Could not find an active experiment on {self.target_bl}") raise SciBecConnectorError(f"Could not find an active experiment on {self.target_bl}")
experiment = self.scibec.experiment.experiment_controller_find_by_id( experiment = self.scibec.experiment.experiment_controller_find_by_id(experiment_id)
path_params={"id": experiment_id}
)
if not experiment: if not experiment:
raise SciBecConnectorError(f"Could not find an experiment with the id {experiment_id}") raise SciBecConnectorError(f"Could not find an experiment with the id {experiment_id}")
experiment = experiment.body
self.scibec_info["activeExperiment"] = experiment self.scibec_info["activeExperiment"] = experiment
def _update_eaccount_in_redis(self): def _update_eaccount_in_redis(self):
write_account = self.scibec_info["activeExperiment"]["writeAccount"] write_account = self.scibec_info["activeExperiment"].write_account
if write_account[0] == "p": if write_account[0] == "p":
write_account = write_account.replace("p", "e") write_account = write_account.replace("p", "e")
msg = messages.VariableMessage(value=write_account) msg = messages.VariableMessage(value=write_account)

View File

@ -5,6 +5,8 @@ import time
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import numpy as np import numpy as np
import py_scibec
import py_scibec_openapi_client.models as py_scibec_models
from bec_lib import MessageEndpoints, bec_logger from bec_lib import MessageEndpoints, bec_logger
from bec_lib.serialization import json_ext from bec_lib.serialization import json_ext
@ -67,41 +69,39 @@ class SciBecMetadataHandler:
if not scibec: if not scibec:
return return
scibec_info = self.scibec_connector.scibec_info scibec_info = self.scibec_connector.scibec_info
experiment_id = scibec_info["activeExperiment"]["id"] experiment_id = scibec_info["activeExperiment"].id
# session_id = scibec_info["activeSession"][0]["id"] # session_id = scibec_info["activeSession"][0]["id"]
# experiment_id = scibec_info["activeSession"][0]["experimentId"] # experiment_id = scibec_info["activeSession"][0]["experimentId"]
logger.debug(f"Received new scan status {msg}") logger.debug(f"Received new scan status {msg}")
scan = scibec.scan.scan_controller_find( scan_filter = py_scibec.bec.ScanFilterWhere(where={"scanId": msg.content["scan_id"]})
query_params={"filter": {"where": {"scanId": msg.content["scan_id"]}}} scan = scibec.scan.scan_controller_find(scan_filter)
).body
if not scan: if not scan:
info = msg.content["info"] info = msg.content["info"]
dataset_number = info.get("dataset_number") dataset_number = info.get("dataset_number")
dataset = scibec.dataset.dataset_controller_find( dataset_filter = py_scibec.bec.DatasetFilterWhere(
query_params={ where={"number": dataset_number, "experimentId": experiment_id}
"filter": {"where": {"number": dataset_number, "experimentId": experiment_id}} )
} dataset = scibec.dataset.dataset_controller_find(dataset_filter)
).body
if dataset: if dataset:
dataset = dataset[0] dataset = dataset[0]
else: else:
dataset = scibec.dataset.dataset_controller_create( new_dataset = py_scibec.bec.NewDataset(
body=scibec.models.Dataset(
**{ **{
"readACL": scibec_info["activeExperiment"]["readACL"], "readACL": scibec_info["activeExperiment"].read_acl,
"writeACL": scibec_info["activeExperiment"]["readACL"], "writeACL": scibec_info["activeExperiment"].read_acl,
"owner": scibec_info["activeExperiment"]["owner"], "owner": scibec_info["activeExperiment"].owner,
"number": dataset_number, "number": dataset_number,
"experimentId": experiment_id, "experimentId": experiment_id,
"name": info.get("dataset_name", ""), "name": info.get("dataset_name", ""),
} }
) )
).body dataset = scibec.dataset.dataset_controller_create(new_dataset)
scan_data = { new_scan = py_scibec.bec.NewScan(
"readACL": scibec_info["activeExperiment"]["readACL"], **{
"writeACL": scibec_info["activeExperiment"]["readACL"], "readACL": scibec_info["activeExperiment"].read_acl,
"owner": scibec_info["activeExperiment"]["owner"], "writeACL": scibec_info["activeExperiment"].read_acl,
"owner": scibec_info["activeExperiment"].owner,
"scanType": info.get("scan_name", ""), "scanType": info.get("scan_name", ""),
"scanId": info.get("scan_id", ""), "scanId": info.get("scan_id", ""),
"queueId": info.get("queue_id", ""), "queueId": info.get("queue_id", ""),
@ -110,16 +110,19 @@ class SciBecMetadataHandler:
# "queue": info.get("stream", ""), # "queue": info.get("stream", ""),
"metadata": info, "metadata": info,
# "sessionId": session_id, # "sessionId": session_id,
"datasetId": dataset["id"], "datasetId": dataset.id,
"scanNumber": info.get("scan_number", 0), "scanNumber": info.get("scan_number", 0),
} }
scan = scibec.scan.scan_controller_create(body=scibec.models.Scan(**scan_data)).body )
scan = scibec.scan.scan_controller_create(new_scan)
# scan = scibec.add_scan(scan_data) # scan = scibec.add_scan(scan_data)
else: else:
info = msg.content["info"] info = msg.content["info"]
scan = scibec.scan.scan_controller_update_by_id( scan = scibec.scan.scan_controller_update_by_id(
path_params={"id": scan[0]["id"]}, id=scan[0].id,
body={"metadata": info, "exitStatus": msg.content["status"]}, scan_partial=py_scibec.bec.ScanPartial(
metadata=info, exitStatus=msg.content["status"]
),
) )
return scan return scan
@ -174,9 +177,8 @@ class SciBecMetadataHandler:
scibec = self.scibec_connector.scibec scibec = self.scibec_connector.scibec
if not scibec: if not scibec:
return return
scan = scibec.scan.scan_controller_find( scan_filter = py_scibec.bec.ScanFilterWhere(where={"scanId": data["metadata"]["scan_id"]})
query_params={"filter": {"where": {"scanId": data["metadata"]["scan_id"]}}} scan = scibec.scan.scan_controller_find(scan_filter)
).body
if not scan: if not scan:
logger.warning( logger.warning(
f"Could not find scan with scan_id {data['metadata']['scan_id']}. Cannot write scan" f"Could not find scan with scan_id {data['metadata']['scan_id']}. Cannot write scan"
@ -195,24 +197,24 @@ class SciBecMetadataHandler:
) )
self._write_scan_data_chunks(file_path, data_bec, scan) self._write_scan_data_chunks(file_path, data_bec, scan)
else: else:
new_scan_data = py_scibec.bec.NewScanData(
scibec.scan_data.scan_data_controller_create_many(
body=scibec.models.ScanData(
**{ **{
"readACL": scan["readACL"], "readACL": scan.read_acl,
"writeACL": scan["readACL"], "writeACL": scan.read_acl,
"owner": scan["owner"], "owner": scan.owner,
"scanId": scan["id"], "scanId": scan.id,
"filePath": file_path, "filePath": file_path,
"data": data_bec, "data": data_bec,
} }
) )
) scibec.scan_data.scan_data_controller_create_many(new_scan_data)
logger.info( logger.info(
f"Wrote scan data to SciBec for scan_id {data['metadata']['scan_id']} in {time.time() - start} seconds." f"Wrote scan data to SciBec for scan_id {data['metadata']['scan_id']} in {time.time() - start} seconds."
) )
def _write_scan_data_chunks(self, file_path: str, data_bec: dict, scan: dict): def _write_scan_data_chunks(
self, file_path: str, data_bec: dict, scan: py_scibec_models.ScanWithRelations
):
""" """
Write the scan data to SciBec in chunks. This method is called if the scan data is larger Write the scan data to SciBec in chunks. This method is called if the scan data is larger
than 1 MB. The method loops through all keys in the data dictionary and creates chunks of than 1 MB. The method loops through all keys in the data dictionary and creates chunks of
@ -234,35 +236,33 @@ class SciBecMetadataHandler:
) )
continue continue
if len(json.dumps(chunk)) + len(json.dumps({key: value})) > self.MAX_DATA_SIZE: if len(json.dumps(chunk)) + len(json.dumps({key: value})) > self.MAX_DATA_SIZE:
scibec.scan_data.scan_data_controller_create_many( new_scan_data = py_scibec.bec.NewScanData(
body=scibec.models.ScanData(
**{ **{
"readACL": scan["readACL"], "readACL": scan.read_acl,
"writeACL": scan["readACL"], "writeACL": scan.read_acl,
"owner": scan["owner"], "owner": scan.owner,
"scanId": scan["id"], "scanId": scan.id,
"filePath": file_path, "filePath": file_path,
"data": chunk, "data": chunk,
} }
) )
) scibec.scan_data.scan_data_controller_create_many(new_scan_data)
chunk = {} chunk = {}
chunk[key] = value chunk[key] = value
# Write the last chunk # Write the last chunk
if chunk: if chunk:
scibec.scan_data.scan_data_controller_create_many( new_scan_data = py_scibec.bec.NewScanData(
body=scibec.models.ScanData(
**{ **{
"readACL": scan["readACL"], "readACL": scan.read_acl,
"writeACL": scan["readACL"], "writeACL": scan.read_acl,
"owner": scan["owner"], "owner": scan.owner,
"scanId": scan["id"], "scanId": scan.id,
"filePath": file_path, "filePath": file_path,
"data": chunk, "data": chunk,
} }
) )
) scibec.scan_data.scan_data_controller_create_many(new_scan_data)
def shutdown(self): def shutdown(self):
""" """

View File

@ -1,4 +1,8 @@
import pytest import pytest
from py_scibec_openapi_client.models.beamline_with_relations import BeamlineWithRelations
from py_scibec_openapi_client.models.dataset_with_relations import DatasetWithRelations
from py_scibec_openapi_client.models.experiment_with_relations import ExperimentWithRelations
from py_scibec_openapi_client.models.scan_with_relations import ScanWithRelations
from bec_lib import bec_logger from bec_lib import bec_logger
@ -10,3 +14,36 @@ from bec_lib import bec_logger
def threads_check(threads_check): def threads_check(threads_check):
yield yield
bec_logger.logger.remove() bec_logger.logger.remove()
@pytest.fixture()
def active_experiment():
return ExperimentWithRelations(
name="dummy_experiment",
writeAccount="p12345",
id="dummy_experiment_id",
beamlineId="dummy_id",
)
@pytest.fixture()
def beamline_document():
return BeamlineWithRelations(
name="dummy_bl", activeExperiment="dummy_experiment", id="dummy_id"
)
@pytest.fixture()
def dataset_document():
return DatasetWithRelations(name="dummy_dataset", id="dummy_id")
@pytest.fixture()
def scan_document():
return ScanWithRelations(
name="dummy_scan",
id="dummy_id",
readACL=["readACL"],
writeACL=["writeACL"],
owner=["owner"],
)

View File

@ -3,7 +3,6 @@ from unittest import mock
import pytest import pytest
from bec_lib import MessageEndpoints, ServiceConfig, messages from bec_lib import MessageEndpoints, ServiceConfig, messages
from bec_lib.logger import bec_logger
from bec_lib.messages import BECStatus from bec_lib.messages import BECStatus
from bec_lib.tests.utils import ConnectorMock from bec_lib.tests.utils import ConnectorMock
from bec_server.scihub import SciHub from bec_server.scihub import SciHub
@ -114,36 +113,18 @@ def test_scibec_connect_to_scibec(SciBecMock):
mock_update_eaccount_in_redis.assert_called_once() mock_update_eaccount_in_redis.assert_called_once()
class ServerResponseMock: def test_scibec_update_experiment_info(SciBecMock, active_experiment, beamline_document):
def __init__(self, body):
self.body = body
def test_scibec_update_experiment_info(SciBecMock):
with mock.patch.object(SciBecMock, "scibec") as mock_scibec: with mock.patch.object(SciBecMock, "scibec") as mock_scibec:
beamline_document = { mock_scibec.beamline.beamline_controller_find.return_value = (beamline_document,)
"name": "dummy_bl", experiment_document = active_experiment
"activeExperiment": "dummy_experiment", mock_scibec.experiment.experiment_controller_find_by_id.return_value = experiment_document
"id": "dummy_id",
}
mock_scibec.beamline.beamline_controller_find.return_value = ServerResponseMock(
(beamline_document,)
)
experiment_document = {
"name": "dummy_experiment",
"writeAccount": "dummy_write_account",
"id": "dummy_experiment_id",
}
mock_scibec.experiment.experiment_controller_find_by_id.return_value = ServerResponseMock(
experiment_document
)
SciBecMock._update_experiment_info() SciBecMock._update_experiment_info()
assert SciBecMock.scibec_info["activeExperiment"] == experiment_document assert SciBecMock.scibec_info["activeExperiment"] == experiment_document
assert SciBecMock.scibec_info["beamline"] == beamline_document assert SciBecMock.scibec_info["beamline"] == beamline_document
def test_update_eaccount_in_redis(SciBecMock): def test_update_eaccount_in_redis(SciBecMock, active_experiment):
SciBecMock.scibec_info = {"activeExperiment": {"writeAccount": "p12345"}} SciBecMock.scibec_info = {"activeExperiment": active_experiment}
with mock.patch.object(SciBecMock, "connector") as mock_connector: with mock.patch.object(SciBecMock, "connector") as mock_connector:
SciBecMock._update_eaccount_in_redis() SciBecMock._update_eaccount_in_redis()
mock_connector.set.assert_called_once_with( mock_connector.set.assert_called_once_with(

View File

@ -2,6 +2,7 @@ from unittest import mock
import numpy as np import numpy as np
import pytest import pytest
from py_scibec_openapi_client.models import NewScanData, ScanPartial
from bec_lib import messages from bec_lib import messages
from bec_lib.redis_connector import MessageObject from bec_lib.redis_connector import MessageObject
@ -46,48 +47,33 @@ def test_update_scan_status_returns_without_scibec(md_handler):
md_handler.update_scan_status(msg) md_handler.update_scan_status(msg)
def test_update_scan_status(md_handler): def test_update_scan_status(md_handler, active_experiment, dataset_document):
# pylint: disable=protected-access # pylint: disable=protected-access
msg = messages.ScanStatusMessage(scan_id="scan_id", status="open", info={"dataset_number": 12}) msg = messages.ScanStatusMessage(scan_id="scan_id", status="open", info={"dataset_number": 12})
scibec = mock.Mock() scibec = mock.Mock()
md_handler.scibec_connector.scibec = scibec md_handler.scibec_connector.scibec = scibec
scibec_info = { scibec_info = {"activeExperiment": active_experiment}
"activeExperiment": {
"id": "id",
"readACL": ["readACL"],
"writeACL": ["writeACL"],
"owner": "owner",
}
}
md_handler.scibec_connector.scibec_info = scibec_info md_handler.scibec_connector.scibec_info = scibec_info
type(scibec.scan.scan_controller_find()).body = mock.PropertyMock(return_value=[]) scibec.scan.scan_controller_find = mock.MagicMock(return_value=[])
type(scibec.dataset.dataset_controller_find()).body = mock.PropertyMock(return_value=[]) scibec.dataset.dataset_controller_find = mock.MagicMock(return_value=[])
type(scibec.dataset.dataset_controller_create()).body = mock.PropertyMock( scibec.dataset.dataset_controller_create = mock.MagicMock(return_value=dataset_document)
return_value={"id": "id"}
)
md_handler.update_scan_status(msg) md_handler.update_scan_status(msg)
def test_update_scan_status_patch(md_handler): def test_update_scan_status_patch(md_handler, active_experiment, scan_document):
# pylint: disable=protected-access # pylint: disable=protected-access
msg = messages.ScanStatusMessage( msg = messages.ScanStatusMessage(
scan_id="scan_id", status="closed", info={"dataset_number": 12} scan_id="scan_id", status="closed", info={"dataset_number": 12}
) )
scibec = mock.Mock() scibec = mock.Mock()
md_handler.scibec_connector.scibec = scibec md_handler.scibec_connector.scibec = scibec
scibec_info = { scibec_info = {"activeExperiment": active_experiment}
"activeExperiment": {
"id": "id",
"readACL": ["readACL"],
"writeACL": ["writeACL"],
"owner": "owner",
}
}
md_handler.scibec_connector.scibec_info = scibec_info md_handler.scibec_connector.scibec_info = scibec_info
type(scibec.scan.scan_controller_find()).body = mock.PropertyMock(return_value=[{"id": "id"}]) scibec.scan.scan_controller_find = mock.MagicMock(return_value=[scan_document])
md_handler.update_scan_status(msg) md_handler.update_scan_status(msg)
scibec.scan.scan_controller_update_by_id.assert_called_once_with( scibec.scan.scan_controller_update_by_id.assert_called_once_with(
path_params={"id": "id"}, body={"metadata": {"dataset_number": 12}, "exitStatus": "closed"} id="dummy_id",
scan_partial=ScanPartial(exit_status="closed", metadata={"dataset_number": 12}),
) )
@ -124,31 +110,27 @@ def test_update_scan_data_without_scan(md_handler):
# pylint: disable=protected-access # pylint: disable=protected-access
scibec = mock.Mock() scibec = mock.Mock()
md_handler.scibec_connector.scibec = scibec md_handler.scibec_connector.scibec = scibec
type(scibec.scan.scan_controller_find()).body = mock.PropertyMock(return_value=[]) scibec.scan.scan_controller_find = mock.MagicMock(return_value=[])
md_handler.update_scan_data( md_handler.update_scan_data(
file_path="my_file.h5", data={"data": {}, "metadata": {"scan_id": "scan_id"}} file_path="my_file.h5", data={"data": {}, "metadata": {"scan_id": "scan_id"}}
) )
def test_update_scan_data(md_handler): def test_update_scan_data(md_handler, scan_document):
# pylint: disable=protected-access # pylint: disable=protected-access
scibec = mock.Mock() scibec = mock.Mock()
md_handler.scibec_connector.scibec = scibec md_handler.scibec_connector.scibec = scibec
type(scibec.scan.scan_controller_find()).body = mock.PropertyMock( scibec.scan.scan_controller_find = mock.MagicMock(return_value=[scan_document])
return_value=[
{"id": "id", "readACL": ["readACL"], "writeACL": ["writeACL"], "owner": "owner"}
]
)
md_handler.update_scan_data( md_handler.update_scan_data(
file_path="my_file.h5", data={"data": {}, "metadata": {"scan_id": "scan_id"}} file_path="my_file.h5", data={"data": {}, "metadata": {"scan_id": "scan_id"}}
) )
scibec.scan_data.scan_data_controller_create_many.assert_called_once_with( scibec.scan_data.scan_data_controller_create_many.assert_called_once_with(
body=scibec.models.ScanData( NewScanData(
**{ **{
"readACL": "readACL", "readACL": ["readACL"],
"writeACL": "readACL", "writeACL": ["readACL"],
"owner": "owner", "owner": ["owner"],
"scanId": "id", "scanId": "dummy_id",
"filePath": "my_file.h5", "filePath": "my_file.h5",
"data": {"data": {}, "metadata": {"scan_id": "scan_id"}}, "data": {"data": {}, "metadata": {"scan_id": "scan_id"}},
} }
@ -156,16 +138,12 @@ def test_update_scan_data(md_handler):
) )
def test_update_scan_data_exceeding_limit(md_handler): def test_update_scan_data_exceeding_limit(md_handler, scan_document):
# pylint: disable=protected-access # pylint: disable=protected-access
scibec = mock.Mock() scibec = mock.Mock()
md_handler.MAX_DATA_SIZE = 1000 md_handler.MAX_DATA_SIZE = 1000
md_handler.scibec_connector.scibec = scibec md_handler.scibec_connector.scibec = scibec
type(scibec.scan.scan_controller_find()).body = mock.PropertyMock( scibec.scan.scan_controller_find = mock.MagicMock(return_value=[scan_document])
return_value=[
{"id": "id", "readACL": ["readACL"], "writeACL": ["writeACL"], "owner": "owner"}
]
)
data_block = {f"key_{i}": {"signal": list(range(100))} for i in range(10)} data_block = {f"key_{i}": {"signal": list(range(100))} for i in range(10)}
data_block.update({"metadata": {"scan_id": "scan_id"}}) data_block.update({"metadata": {"scan_id": "scan_id"}})
md_handler.update_scan_data(file_path="my_file.h5", data=data_block) md_handler.update_scan_data(file_path="my_file.h5", data=data_block)