refactor: changed repo structure
This commit is contained in:
@@ -0,0 +1 @@
|
||||
from .bec_client import *
|
||||
@@ -0,0 +1 @@
|
||||
from .plugins import *
|
||||
@@ -0,0 +1,245 @@
|
||||
from bec_client.scan_manager import ScanReport
|
||||
from bec_utils.devicemanager import Device
|
||||
|
||||
# pylint:disable=undefined-variable
|
||||
# pylint: disable=too-many-arguments
|
||||
|
||||
|
||||
def dscan(
|
||||
motor1: Device, m1_from: float, m1_to: float, steps: int, exp_time: float, **kwargs
|
||||
) -> ScanReport:
|
||||
"""Relative line scan with one device.
|
||||
|
||||
Args:
|
||||
motor1 (Device): Device that should be scanned.
|
||||
m1_from (float): Start position relative to the current position.
|
||||
m1_to (float): End position relative to the current position.
|
||||
steps (int): Number of steps.
|
||||
exp_time (float): Exposure time.
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> dscan(dev.motor1, -5, 5, 10, 0.1)
|
||||
"""
|
||||
return scans.line_scan(
|
||||
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=True, **kwargs
|
||||
)
|
||||
|
||||
|
||||
def d2scan(
|
||||
motor1: Device,
|
||||
m1_from: float,
|
||||
m1_to: float,
|
||||
motor2: Device,
|
||||
m2_from: float,
|
||||
m2_to: float,
|
||||
steps: int,
|
||||
exp_time: float,
|
||||
**kwargs
|
||||
) -> ScanReport:
|
||||
"""Relative line scan with two devices.
|
||||
|
||||
Args:
|
||||
motor1 (Device): First device that should be scanned.
|
||||
m1_from (float): Start position of the first device relative to its current position.
|
||||
m1_to (float): End position of the first device relative to its current position.
|
||||
motor2 (Device): Second device that should be scanned.
|
||||
m2_from (float): Start position of the second device relative to its current position.
|
||||
m2_to (float): End position of the second device relative to its current position.
|
||||
steps (int): Number of steps.
|
||||
exp_time (float): Exposure time
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> d2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
|
||||
"""
|
||||
return scans.line_scan(
|
||||
motor1,
|
||||
m1_from,
|
||||
m1_to,
|
||||
motor2,
|
||||
m2_from,
|
||||
m2_to,
|
||||
steps=steps,
|
||||
exp_time=exp_time,
|
||||
relative=True,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
def ascan(motor1, m1_from, m1_to, steps, exp_time, **kwargs):
|
||||
"""Absolute line scan with one device.
|
||||
|
||||
Args:
|
||||
motor1 (Device): Device that should be scanned.
|
||||
m1_from (float): Start position.
|
||||
m1_to (float): End position.
|
||||
steps (int): Number of steps.
|
||||
exp_time (float): Exposure time.
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> ascan(dev.motor1, -5, 5, 10, 0.1)
|
||||
"""
|
||||
return scans.line_scan(
|
||||
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=False, **kwargs
|
||||
)
|
||||
|
||||
|
||||
def a2scan(motor1, m1_from, m1_to, motor2, m2_from, m2_to, steps, exp_time, **kwargs):
|
||||
"""Absolute line scan with two devices.
|
||||
|
||||
Args:
|
||||
motor1 (Device): First device that should be scanned.
|
||||
m1_from (float): Start position of the first device.
|
||||
m1_to (float): End position of the first device.
|
||||
motor2 (Device): Second device that should be scanned.
|
||||
m2_from (float): Start position of the second device.
|
||||
m2_to (float): End position of the second device.
|
||||
steps (int): Number of steps.
|
||||
exp_time (float): Exposure time
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> a2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
|
||||
"""
|
||||
return scans.line_scan(
|
||||
motor1,
|
||||
m1_from,
|
||||
m1_to,
|
||||
motor2,
|
||||
m2_from,
|
||||
m2_to,
|
||||
steps=steps,
|
||||
exp_time=exp_time,
|
||||
relative=False,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
def dmesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
|
||||
"""Relative mesh scan (grid scan) with two devices.
|
||||
|
||||
Args:
|
||||
motor1 (Device): First device that should be scanned.
|
||||
m1_from (float): Start position of the first device relative to its current position.
|
||||
m1_to (float): End position of the first device relative to its current position.
|
||||
m1_steps (int): Number of steps for motor1.
|
||||
motor2 (Device): Second device that should be scanned.
|
||||
m2_from (float): Start position of the second device relative to its current position.
|
||||
m2_to (float): End position of the second device relative to its current position.
|
||||
m2_steps (int): Number of steps for motor2.
|
||||
exp_time (float): Exposure time
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> dmesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
|
||||
"""
|
||||
return scans.grid_scan(
|
||||
motor1,
|
||||
m1_from,
|
||||
m1_to,
|
||||
m1_steps,
|
||||
motor2,
|
||||
m2_from,
|
||||
m2_to,
|
||||
m2_steps,
|
||||
exp_time=exp_time,
|
||||
relative=True,
|
||||
)
|
||||
|
||||
|
||||
def amesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
|
||||
"""Absolute mesh scan (grid scan) with two devices.
|
||||
|
||||
Args:
|
||||
motor1 (Device): First device that should be scanned.
|
||||
m1_from (float): Start position of the first device.
|
||||
m1_to (float): End position of the first device.
|
||||
m1_steps (int): Number of steps for motor1.
|
||||
motor2 (Device): Second device that should be scanned.
|
||||
m2_from (float): Start position of the second device.
|
||||
m2_to (float): End position of the second device.
|
||||
m2_steps (int): Number of steps for motor2.
|
||||
exp_time (float): Exposure time
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> amesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
|
||||
"""
|
||||
return scans.grid_scan(
|
||||
motor1,
|
||||
m1_from,
|
||||
m1_to,
|
||||
m1_steps,
|
||||
motor2,
|
||||
m2_from,
|
||||
m2_to,
|
||||
m2_steps,
|
||||
exp_time=exp_time,
|
||||
relative=False,
|
||||
)
|
||||
|
||||
|
||||
def umv(*args) -> ScanReport:
|
||||
"""Updated absolute move (i.e. blocking) for one or more devices.
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> umv(dev.samx, 1)
|
||||
>>> umv(dev.samx, 1, dev.samy, 2)
|
||||
"""
|
||||
return scans.umv(*args, relative=False)
|
||||
|
||||
|
||||
def umvr(*args) -> ScanReport:
|
||||
"""Updated relative move (i.e. blocking) for one or more devices.
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> umvr(dev.samx, 1)
|
||||
>>> umvr(dev.samx, 1, dev.samy, 2)
|
||||
"""
|
||||
return scans.umv(*args, relative=True)
|
||||
|
||||
|
||||
def mv(*args) -> ScanReport:
|
||||
"""Absolute move for one or more devices.
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> mv(dev.samx, 1)
|
||||
>>> mv(dev.samx, 1, dev.samy, 2)
|
||||
"""
|
||||
return scans.mv(*args, relative=False)
|
||||
|
||||
|
||||
def mvr(*args) -> ScanReport:
|
||||
"""Relative move for one or more devices.
|
||||
|
||||
Returns:
|
||||
ScanReport: Status object.
|
||||
|
||||
Examples:
|
||||
>>> mvr(dev.samx, 1)
|
||||
>>> mvr(dev.samx, 1, dev.samy, 2)
|
||||
"""
|
||||
return scans.mv(*args, relative=True)
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
"""
|
||||
Post startup script for the BEC client. This script is executed after the
|
||||
IPython shell is started. It is used to load the beamline specific
|
||||
information and to setup the prompts.
|
||||
|
||||
The script is executed in the global namespace of the IPython shell. This
|
||||
means that all variables defined here are available in the shell.
|
||||
|
||||
If needed, bec command-line arguments can be parsed here. For example, to
|
||||
parse the --session argument, add the following lines to the script:
|
||||
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--session", help="Session name", type=str, default="my_default_session")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.session == "my_session":
|
||||
print("Loading my_session session")
|
||||
from bec_plugins.bec_client.plugins.my_session import *
|
||||
else:
|
||||
print("Loading default session")
|
||||
from bec_plugins.bec_client.plugins.default_session import *
|
||||
"""
|
||||
|
||||
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
|
||||
import argparse
|
||||
|
||||
from bec_lib.core import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
logger.info("Using the Debye startup script.")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--session", help="Session name", type=str, default="Debye")
|
||||
args = parser.parse_args()
|
||||
|
||||
# SETUP BEAMLINE INFO
|
||||
from bec_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
|
||||
|
||||
bec._beamline_mixin._bl_info_register(SLSInfo)
|
||||
bec._beamline_mixin._bl_info_register(OperatorInfo)
|
||||
|
||||
# SETUP PROMPTS
|
||||
bec._ip.prompts.username = "Debye"
|
||||
bec._ip.prompts.status = 1
|
||||
@@ -0,0 +1,25 @@
|
||||
"""
|
||||
Pre-startup script for BEC client. This script is executed before the BEC client
|
||||
is started. It can be used to set up the BEC client configuration. The script is
|
||||
executed in the global namespace of the BEC client. This means that all
|
||||
variables defined here are available in the BEC client.
|
||||
|
||||
To set up the BEC client configuration, use the ServiceConfig class. For example,
|
||||
to set the configuration file path, add the following lines to the script:
|
||||
|
||||
import pathlib
|
||||
from bec_lib.core import ServiceConfig
|
||||
|
||||
current_path = pathlib.Path(__file__).parent.resolve()
|
||||
CONFIG_PATH = f"{current_path}/<path_to_my_config_file.yaml>"
|
||||
|
||||
config = ServiceConfig(CONFIG_PATH)
|
||||
|
||||
If this startup script defined a ServiceConfig object, the BEC client will use
|
||||
it to configure itself. Otherwise, the BEC client will use the default config.
|
||||
"""
|
||||
|
||||
# example:
|
||||
# current_path = pathlib.Path(__file__).parent.resolve()
|
||||
# CONFIG_PATH = f"{current_path}/../../../bec_config.yaml"
|
||||
# config = ServiceConfig(CONFIG_PATH)
|
||||
+34
-27
@@ -16,6 +16,7 @@ from bec_lib.core import (
|
||||
)
|
||||
from bec_lib.core.bec_service import BECService
|
||||
from bec_lib.core.file_utils import FileWriterMixin
|
||||
from bec_lib.core.redis_connector import MessageObject
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -59,7 +60,7 @@ class NIDAQWriterService(BECService):
|
||||
self._scan_status_consumer.start()
|
||||
|
||||
@staticmethod
|
||||
def _scan_status_callback(message: BECMessage, parent: NIDAQWriterService) -> None:
|
||||
def _scan_status_callback(message: MessageObject, parent: NIDAQWriterService) -> None:
|
||||
"""
|
||||
Callback for scan status messages.
|
||||
"""
|
||||
@@ -74,7 +75,7 @@ class NIDAQWriterService(BECService):
|
||||
"""
|
||||
|
||||
self._ni_data_event = threading.Event()
|
||||
self._ni_data_consumer = threading.Thread(target=self._read_data, daemon=True)
|
||||
self._ni_data_consumer = threading.Thread(target=self._run_read_loop, daemon=True)
|
||||
self._ni_data_consumer.start()
|
||||
|
||||
def _start_ni_writer(self) -> None:
|
||||
@@ -85,38 +86,44 @@ class NIDAQWriterService(BECService):
|
||||
self._ni_writer = threading.Thread(target=self._write_data, daemon=True)
|
||||
self._ni_writer.start()
|
||||
|
||||
def _run_read_loop(self) -> None:
|
||||
"""
|
||||
Run the read loop.
|
||||
"""
|
||||
while not self._ni_data_event.is_set():
|
||||
self._read_data()
|
||||
|
||||
def _read_data(self):
|
||||
"""
|
||||
Read data from Redis.
|
||||
"""
|
||||
while not self._ni_data_event.is_set():
|
||||
if not self.scan_is_running:
|
||||
time.sleep(0.01)
|
||||
continue
|
||||
if not self.scan_is_running:
|
||||
time.sleep(0.01)
|
||||
return
|
||||
|
||||
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
|
||||
self.filename = self.writer_mixin.compile_full_filename(self.scan_number, "ni.h5")
|
||||
|
||||
start_time = time.time()
|
||||
if self.use_redis_stream:
|
||||
msg = self.producer.xread("ni_data")
|
||||
start_time = time.time()
|
||||
if self.use_redis_stream:
|
||||
msg = self.producer.xread("ni_data")
|
||||
|
||||
if msg:
|
||||
num_msgs = len(msg[0][1])
|
||||
print(f"Received {num_msgs} messages in {time.time() - start_time} seconds")
|
||||
msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
print(f"Handled {num_msgs} messages in {time.time() - start_time} seconds")
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
msgs = self.producer.r.lpop("ni_data:val", 20)
|
||||
time.sleep(0.001)
|
||||
if msgs:
|
||||
msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
|
||||
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
if msg:
|
||||
num_msgs = len(msg[0][1])
|
||||
print(f"Received {num_msgs} messages in {time.time() - start_time} seconds")
|
||||
msgs = [BECMessage.DeviceMessage.loads(m[1][b"device_msg"]) for m in msg[0][1]]
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
print(f"Handled {num_msgs} messages in {time.time() - start_time} seconds")
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
msgs = self.producer.r.lpop("ni_data:val", 20)
|
||||
time.sleep(0.001)
|
||||
if msgs:
|
||||
msgs = [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
|
||||
print(f"Received {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
start_time = time.time()
|
||||
self.handle_ni_data(msgs)
|
||||
print(f"Handled {len(msgs)} messages in {time.time() - start_time} seconds")
|
||||
|
||||
def handle_scan_status(self, msg: BECMessage.ScanStatusMessage) -> None:
|
||||
"""
|
||||
@@ -0,0 +1,11 @@
|
||||
# This file is used to select the BEC and Ophyd Devices version for the auto deployment process.
|
||||
# Do not edit this file unless you know what you are doing!
|
||||
|
||||
# The version can be a git tag, branch or commit hash.
|
||||
|
||||
# BEC version to use
|
||||
BEC_AUTODEPLOY_VERSION="master"
|
||||
|
||||
# ophyd_devices version to use
|
||||
OPHYD_DEVICES_AUTODEPLOY_VERSION="master"
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
redis:
|
||||
host: localhost
|
||||
port: 6379
|
||||
mongodb:
|
||||
host: localhost
|
||||
port: 27017
|
||||
scibec:
|
||||
host: http://[::1]
|
||||
port: 3030
|
||||
beamline: "DEBYE"
|
||||
service_config:
|
||||
general:
|
||||
reset_queue_on_cancel: True
|
||||
enforce_ACLs: False
|
||||
file_writer:
|
||||
plugin: default_NeXus_format
|
||||
base_path: ./
|
||||
|
||||
Executable
+28
@@ -0,0 +1,28 @@
|
||||
# deployment script to be translated to Ansible
|
||||
|
||||
# can be removed once we have the autodeployment in place
|
||||
BEAMLINE_REPO=gitlab.psi.ch:bec/debye-bec.git
|
||||
git clone git@$BEAMLINE_REPO
|
||||
|
||||
module add psi-python38/2020.11
|
||||
|
||||
# start redis
|
||||
docker run --network=host --name redis-bec -d redis
|
||||
# alternative:
|
||||
# conda install -y redis; redis-server &
|
||||
|
||||
|
||||
# get the target versions for ophyd_devices and BEC
|
||||
source ./debye-bec/deployment/autodeploy_versions
|
||||
|
||||
git clone -b $OPHYD_DEVICES_AUTODEPLOY_VERSION https://gitlab.psi.ch/bec/ophyd_devices.git
|
||||
git clone -b $BEC_AUTODEPLOY_VERSION https://gitlab.psi.ch/bec/bec.git
|
||||
|
||||
# install BEC
|
||||
cd bec
|
||||
source ./bin/install_bec_dev.sh
|
||||
|
||||
cd ../
|
||||
# start the BEC server
|
||||
bec-server start --config ./debye-bec/deployment/bec-server-config.yaml
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
[metadata]
|
||||
name = bec_plugins
|
||||
description = BEC plugins to modify the behaviour of services within the BEC framework
|
||||
long_description = file: README.md
|
||||
long_description_content_type = text/markdown
|
||||
url = https://gitlab.psi.ch/bec/bec
|
||||
project_urls =
|
||||
Bug Tracker = https://gitlab.psi.ch/bec/bec/issues
|
||||
classifiers =
|
||||
Programming Language :: Python :: 3
|
||||
Development Status :: 3 - Alpha
|
||||
Topic :: Scientific/Engineering
|
||||
|
||||
[options]
|
||||
package_dir =
|
||||
= .
|
||||
packages = find:
|
||||
python_requires = >=3.8
|
||||
|
||||
[options.packages.find]
|
||||
where = .
|
||||
@@ -0,0 +1,7 @@
|
||||
from setuptools import setup
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup(
|
||||
install_requires=[],
|
||||
extras_require={"dev": ["pytest", "pytest-random-order", "coverage"]},
|
||||
)
|
||||
@@ -0,0 +1,171 @@
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from bec_lib.core import BECMessage, MessageEndpoints, ServiceConfig
|
||||
from bec_lib.core.redis_connector import MessageObject
|
||||
from bec_lib.core.tests.utils import ConnectorMock
|
||||
|
||||
from bec_plugins.services.NIDAQ_writer import NIDAQWriterService
|
||||
|
||||
|
||||
def test_nidaq_starts_consumers():
|
||||
with mock.patch.object(NIDAQWriterService, "_start_scan_status_consumer") as mock_scan_start:
|
||||
with mock.patch.object(NIDAQWriterService, "_start_ni_data_consumer") as mock_data_start:
|
||||
with mock.patch.object(NIDAQWriterService, "_start_ni_writer") as mock_writer_start:
|
||||
NIDAQWriterService(
|
||||
config=ServiceConfig(redis={"host": "test", "port": 6379}),
|
||||
connector_cls=mock.MagicMock(),
|
||||
)
|
||||
mock_scan_start.assert_called_once()
|
||||
mock_data_start.assert_called_once()
|
||||
mock_writer_start.assert_called_once()
|
||||
|
||||
|
||||
class NIWriterMock(NIDAQWriterService):
|
||||
def _start_ni_data_consumer(self) -> None:
|
||||
pass
|
||||
|
||||
def _start_ni_writer(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def nidaq():
|
||||
service = NIWriterMock(
|
||||
config=ServiceConfig(redis={"host": "test", "port": 6379}),
|
||||
connector_cls=mock.MagicMock(),
|
||||
)
|
||||
yield service
|
||||
|
||||
|
||||
def test_nidaq_scan_status_consumer(nidaq):
|
||||
nidaq.connector.consumer.assert_called_once_with(
|
||||
MessageEndpoints.scan_status(), cb=nidaq._scan_status_callback, parent=nidaq
|
||||
)
|
||||
nidaq._scan_status_consumer.start.assert_called_once()
|
||||
|
||||
|
||||
def test_scan_status_callback(nidaq):
|
||||
scan_status_msg = BECMessage.ScanStatusMessage(scanID="test", status="open", info={})
|
||||
msg_obj = MessageObject(
|
||||
topic="test",
|
||||
value=scan_status_msg.dumps(),
|
||||
)
|
||||
with mock.patch.object(nidaq, "handle_scan_status") as mock_handle:
|
||||
nidaq._scan_status_callback(msg_obj, nidaq)
|
||||
mock_handle.assert_called_once_with(scan_status_msg)
|
||||
|
||||
|
||||
def test_nidaq_doesnt_read_data_when_scan_is_not_running(nidaq):
|
||||
nidaq.scan_is_running = False
|
||||
with mock.patch.object(nidaq, "writer_mixin") as mock_writer:
|
||||
nidaq._read_data()
|
||||
mock_writer.compile_full_filename.assert_not_called()
|
||||
|
||||
|
||||
def test_nidaq_reads_data(nidaq):
|
||||
nidaq.scan_is_running = True
|
||||
nidaq.use_redis_stream = False
|
||||
with mock.patch.object(nidaq, "writer_mixin") as mock_writer:
|
||||
with mock.patch.object(nidaq, "handle_ni_data") as mock_handle:
|
||||
nidaq._read_data()
|
||||
mock_writer.compile_full_filename.assert_called_once()
|
||||
mock_handle.assert_called_once()
|
||||
|
||||
|
||||
def test_nidaq_reads_data_from_strea(nidaq):
|
||||
nidaq.scan_is_running = True
|
||||
nidaq.use_redis_stream = True
|
||||
with mock.patch.object(nidaq, "writer_mixin") as mock_writer:
|
||||
with mock.patch.object(nidaq, "handle_ni_data") as mock_handle:
|
||||
nidaq._read_data()
|
||||
mock_writer.compile_full_filename.assert_called_once()
|
||||
mock_handle.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("scan_status", ["open", "closed", "aborted", "halted", None])
|
||||
def test_nidaq_handle_scan_status(nidaq, scan_status):
|
||||
scan_status_msg = BECMessage.ScanStatusMessage(
|
||||
scanID="test", status=scan_status, info={"scan_number": 5}
|
||||
)
|
||||
nidaq.handle_scan_status(scan_status_msg)
|
||||
if scan_status == "open":
|
||||
assert nidaq.scan_is_running
|
||||
assert nidaq.scan_number == 5
|
||||
else:
|
||||
assert not nidaq.scan_is_running
|
||||
assert nidaq.scan_number is None
|
||||
|
||||
|
||||
def test_nidaq_handle_ni_data(nidaq):
|
||||
data = [
|
||||
BECMessage.DeviceMessage(signals={"signal1": list(range(10)), "signal2": list(range(10))}),
|
||||
BECMessage.DeviceMessage(
|
||||
signals={"signal1": list(range(10, 20)), "signal2": list(range(10, 20))}
|
||||
),
|
||||
]
|
||||
|
||||
nidaq.handle_ni_data(data)
|
||||
signal = nidaq.queue.get()
|
||||
assert all(signal["signal1"] == np.asarray(range(20)))
|
||||
assert all(signal["signal2"] == np.asarray(range(20)))
|
||||
|
||||
|
||||
def test_nidaq_write_data_without_filename(nidaq):
|
||||
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
|
||||
|
||||
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
|
||||
nidaq.write_data(signal)
|
||||
mock_h5py.File.assert_not_called()
|
||||
|
||||
|
||||
def test_nidaq_write_data_with_filename(nidaq):
|
||||
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
|
||||
nidaq.filename = "test.h5"
|
||||
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
|
||||
nidaq.write_data(signal)
|
||||
mock_h5py.File.assert_called_once_with("test.h5", "a")
|
||||
|
||||
|
||||
def test_nidaq_write_data_reshape(nidaq):
|
||||
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
|
||||
nidaq.filename = "test.h5"
|
||||
nidaq.reshape_dataset = True
|
||||
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
|
||||
nidaq.write_data(signal)
|
||||
mock_h5py.File.assert_called_once_with("test.h5", "a")
|
||||
|
||||
|
||||
def test_nidaq_write_data_without_reshape(nidaq):
|
||||
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
|
||||
nidaq.filename = "test.h5"
|
||||
nidaq.reshape_dataset = False
|
||||
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
|
||||
nidaq.write_data(signal)
|
||||
mock_h5py.File.assert_called_once_with("test.h5", "a")
|
||||
file_handle = mock_h5py.File().__enter__()
|
||||
file_handle.create_group.assert_called_once_with("dataset_0")
|
||||
calls = file_handle.create_group().create_dataset.call_args_list
|
||||
assert calls[0] == mock.call(
|
||||
"signal1", data=signal["signal1"], chunks=True, maxshape=(None,)
|
||||
)
|
||||
assert calls[1] == mock.call(
|
||||
"signal2", data=signal["signal2"], chunks=True, maxshape=(None,)
|
||||
)
|
||||
file_handle.keys.return_value = ["dataset_0"]
|
||||
nidaq.write_data(signal)
|
||||
assert mock.call("dataset_1") in file_handle.create_group.call_args_list
|
||||
|
||||
|
||||
def test_nidaq_write_data_reshapes_data(nidaq):
|
||||
signal = {"signal1": np.asarray(range(20)), "signal2": np.asarray(range(20))}
|
||||
nidaq.filename = "test.h5"
|
||||
nidaq.reshape_dataset = True
|
||||
with mock.patch("bec_plugins.services.NIDAQ_writer.NIDAQ_writer.h5py") as mock_h5py:
|
||||
file_handle = mock_h5py.File().__enter__()
|
||||
file_handle.__contains__.side_effect = signal.__contains__
|
||||
nidaq.write_data(signal)
|
||||
dataset = file_handle["signal1"]
|
||||
assert len(dataset.resize.call_args_list) == 2
|
||||
assert mock.call("test.h5", "a") in mock_h5py.File.call_args_list
|
||||
Reference in New Issue
Block a user