refactor: fix bec_lib imports

This commit is contained in:
usov_i 2023-11-13 08:28:09 +01:00
parent 3c173f59f7
commit d851cf6f8e
23 changed files with 74 additions and 80 deletions

View File

@ -2,7 +2,7 @@ import os
import time import time
from typing import List from typing import List
from bec_lib.core import BECMessage, MessageEndpoints, bec_logger from bec_lib import messages, MessageEndpoints, bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Signal from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO, Signal
@ -81,9 +81,9 @@ class Eiger1p5MDetector(Device):
self.metadata = {} self.metadata = {}
self.username = "e20588" # TODO get from config self.username = "e20588" # TODO get from config
def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage: def _get_current_scan_msg(self) -> messages.ScanStatusMessage:
msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
return BECMessage.ScanStatusMessage.loads(msg) return messages.ScanStatusMessage.loads(msg)
def _get_scan_dir(self, scan_bundle, scan_number, leading_zeros=None): def _get_scan_dir(self, scan_bundle, scan_number, leading_zeros=None):
if leading_zeros is None: if leading_zeros is None:
@ -159,7 +159,7 @@ class Eiger1p5MDetector(Device):
break break
self.detector_control.put("stop") self.detector_control.put("stop")
signals = {"config": self.read(), "data": self.file_name} signals = {"config": self.read(), "data": self.file_name}
msg = BECMessage.DeviceMessage(signals=signals, metadata=self.metadata) msg = messages.DeviceMessage(signals=signals, metadata=self.metadata)
self.device_manager.producer.set_and_publish( self.device_manager.producer.set_and_publish(
MessageEndpoints.device_read(self.name), msg.dumps() MessageEndpoints.device_read(self.name), msg.dumps()
) )

View File

@ -13,7 +13,7 @@ from ophyd.pseudopos import (
from ophyd_devices.utils.socket import data_shape, data_type from ophyd_devices.utils.socket import data_shape, data_type
from ophyd_devices.utils import bec_utils as bec_utils from ophyd_devices.utils import bec_utils as bec_utils
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin

View File

@ -6,7 +6,7 @@ import traceback
from collections import OrderedDict from collections import OrderedDict
from typing import Any from typing import Any
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import FormattedComponent as FCpt from ophyd import FormattedComponent as FCpt

View File

@ -1,7 +1,6 @@
import os import os
from bec_lib.core import DeviceManagerBase, BECMessage, MessageEndpoints from bec_lib import DeviceManagerBase, messages, MessageEndpoints, bec_logger
from bec_lib.core import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
@ -82,18 +81,18 @@ class BecScaninfoMixin:
"""Change BECInfoMsg object""" """Change BECInfoMsg object"""
self.bec_info_msg = bec_info_msg self.bec_info_msg = bec_info_msg
def _get_current_scan_msg(self) -> BECMessage.ScanStatusMessage: def _get_current_scan_msg(self) -> messages.ScanStatusMessage:
"""Get current scan message """Get current scan message
Returns: Returns:
BECMessage.ScanStatusMessage: BECMessage.ScanStatusMessage object messages.ScanStatusMessage: messages.ScanStatusMessage object
""" """
if not self.sim_mode: if not self.sim_mode:
# TODO what if no scan info is there yet! # TODO what if no scan info is there yet!
msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
return BECMessage.ScanStatusMessage.loads(msg) return messages.ScanStatusMessage.loads(msg)
return BECMessage.ScanStatusMessage( return messages.ScanStatusMessage(
scanID="1", scanID="1",
status={}, status={},
info=self.bec_info_msg, info=self.bec_info_msg,

View File

@ -1,7 +1,6 @@
import enum import enum
import time import time
import threading import threading
from bec_lib.core.devicemanager import DeviceStatus
import numpy as np import numpy as np
import os import os
@ -13,10 +12,10 @@ from ophyd import ADComponent as ADCpt
from std_daq_client import StdDaqClient from std_daq_client import StdDaqClient
from bec_lib.core import BECMessage, MessageEndpoints, threadlocked from bec_lib import messages, MessageEndpoints, threadlocked, bec_logger
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.bec_service import SERVICE_CONFIG
from bec_lib.core import bec_logger from bec_lib.devicemanager import DeviceStatus
from bec_lib.core.bec_service import SERVICE_CONFIG from bec_lib.file_utils import FileWriterMixin
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils from ophyd_devices.utils import bec_utils
@ -399,9 +398,9 @@ class Eiger9McSAXS(DetectorBase):
""" """
pipe = self._producer.pipeline() pipe = self._producer.pipeline()
if successful is None: if successful is None:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done) msg = messages.FileMessage(file_path=self.filepath, done=done)
else: else:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful) msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish( self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
) )

View File

@ -3,17 +3,16 @@ import os
import time import time
from typing import List from typing import List
from bec_lib.core.devicemanager import DeviceStatus
from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Component as Cpt
from ophyd.mca import EpicsMCARecord from ophyd.mca import EpicsMCARecord
from ophyd import Device from ophyd import Device
from bec_lib.core.file_utils import FileWriterMixin from bec_lib import MessageEndpoints, messages, bec_logger
from bec_lib.core import MessageEndpoints, BECMessage from bec_lib.file_utils import FileWriterMixin
from bec_lib.core import bec_logger from bec_lib.devicemanager import DeviceStatus
from bec_lib.core.bec_service import SERVICE_CONFIG from bec_lib.bec_service import SERVICE_CONFIG
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils from ophyd_devices.utils import bec_utils
@ -390,9 +389,9 @@ class FalconcSAXS(Device):
""" """
pipe = self._producer.pipeline() pipe = self._producer.pipeline()
if successful is None: if successful is None:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done) msg = messages.FileMessage(file_path=self.filepath, done=done)
else: else:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful) msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish( self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
) )

View File

@ -12,12 +12,10 @@ from ophyd.scaler import ScalerCH
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
from ophyd_devices.utils import bec_utils from ophyd_devices.utils import bec_utils
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib import messages, MessageEndpoints, bec_logger, threadlocked
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.file_utils import FileWriterMixin
from collections import defaultdict from collections import defaultdict
from bec_lib.core import bec_logger, threadlocked
logger = bec_logger.logger logger = bec_logger.logger
@ -277,7 +275,7 @@ class McsCsaxs(SIS38XX):
"num_lines": self.num_lines.get(), "num_lines": self.num_lines.get(),
} }
) )
msg = BECMessage.DeviceMessage( msg = messages.DeviceMessage(
signals=dict(self.mca_data), signals=dict(self.mca_data),
metadata=self.scaninfo.scan_msg.metadata, metadata=self.scaninfo.scan_msg.metadata,
).dumps() ).dumps()
@ -333,7 +331,7 @@ class McsCsaxs(SIS38XX):
self._prep_det() self._prep_det()
self._prep_readout() self._prep_readout()
# msg = BECMessage.FileMessage(file_path=self.filepath, done=False) # msg = messages.FileMessage(file_path=self.filepath, done=False)
# self._producer.set_and_publish( # self._producer.set_and_publish(
# MessageEndpoints.public_file(self.scaninfo.scanID, "mcs_csaxs"), # MessageEndpoints.public_file(self.scaninfo.scanID, "mcs_csaxs"),
# msg.dumps(), # msg.dumps(),

View File

@ -2,7 +2,6 @@ import enum
import json import json
import os import os
import time import time
from bec_lib.core.devicemanager import DeviceStatus
import requests import requests
import numpy as np import numpy as np
@ -12,10 +11,10 @@ from ophyd import EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from ophyd import DetectorBase, Device, Staged from ophyd import DetectorBase, Device, Staged
from ophyd import ADComponent as ADCpt from ophyd import ADComponent as ADCpt
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib import messages, MessageEndpoints, bec_logger
from bec_lib.core.file_utils import FileWriterMixin from bec_lib.file_utils import FileWriterMixin
from bec_lib.core.bec_service import SERVICE_CONFIG from bec_lib.bec_service import SERVICE_CONFIG
from bec_lib.core import bec_logger from bec_lib.devicemanager import DeviceStatus
from ophyd_devices.utils import bec_utils as bec_utils from ophyd_devices.utils import bec_utils as bec_utils
from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin from ophyd_devices.epics.devices.bec_scaninfo_mixin import BecScaninfoMixin
@ -448,9 +447,9 @@ class PilatuscSAXS(DetectorBase):
""" """
pipe = self._producer.pipeline() pipe = self._producer.pipeline()
if successful is None: if successful is None:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done) msg = messages.FileMessage(file_path=self.filepath, done=done)
else: else:
msg = BECMessage.FileMessage(file_path=self.filepath, done=done, successful=successful) msg = messages.FileMessage(file_path=self.filepath, done=done, successful=successful)
self._producer.set_and_publish( self._producer.set_and_publish(
MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe MessageEndpoints.public_file(self.scaninfo.scanID, self.name), msg.dumps(), pipe=pipe
) )
@ -495,7 +494,7 @@ class PilatuscSAXS(DetectorBase):
def _start_h5converter(self, done=False) -> None: def _start_h5converter(self, done=False) -> None:
"""Start the h5converter""" """Start the h5converter"""
msg = BECMessage.FileMessage( msg = messages.FileMessage(
file_path=self.filepath_raw, done=done, metadata={"input_path": self.filepath} file_path=self.filepath_raw, done=done, metadata={"input_path": self.filepath}
) )
self._producer.set_and_publish( self._producer.set_and_publish(

View File

@ -4,7 +4,7 @@ import time
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait

View File

@ -4,7 +4,7 @@ import time
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait

View File

@ -4,7 +4,7 @@ import time
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait

View File

@ -4,7 +4,7 @@ import time
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Component as Cpt, DeviceStatus from ophyd import Component as Cpt, DeviceStatus
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait

View File

@ -4,7 +4,7 @@ import time
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import BECMessage, MessageEndpoints, bec_logger from bec_lib import messages, MessageEndpoints, bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait
@ -311,7 +311,7 @@ class RtLamniController(Controller):
flyer_info = self._get_flyer_device_info() flyer_info = self._get_flyer_device_info()
self.get_device_manager().producer.set( self.get_device_manager().producer.set(
MessageEndpoints.device_info("rt_scan"), MessageEndpoints.device_info("rt_scan"),
BECMessage.DeviceInfoMessage(device="rt_scan", info=flyer_info).dumps(), messages.DeviceInfoMessage(device="rt_scan", info=flyer_info).dumps(),
) )
def _get_flyer_device_info(self) -> dict: def _get_flyer_device_info(self) -> dict:
@ -388,7 +388,7 @@ class RtLamniController(Controller):
# error # error
self.get_device_manager().producer.set_and_publish( self.get_device_manager().producer.set_and_publish(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
BECMessage.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata device="rt_scan", status=1, metadata=self.readout_metadata
).dumps(), ).dumps(),
) )
@ -423,7 +423,7 @@ class RtLamniController(Controller):
self.get_device_manager().producer.set_and_publish( self.get_device_manager().producer.set_and_publish(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
BECMessage.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata device="rt_scan", status=0, metadata=self.readout_metadata
).dumps(), ).dumps(),
) )
@ -435,7 +435,7 @@ class RtLamniController(Controller):
def publish_device_data(self, signals, pointID): def publish_device_data(self, signals, pointID):
self.get_device_manager().producer.send( self.get_device_manager().producer.send(
MessageEndpoints.device_read("rt_lamni"), MessageEndpoints.device_read("rt_lamni"),
BECMessage.DeviceMessage( messages.DeviceMessage(
signals=signals, signals=signals,
metadata={"pointID": pointID, **self.readout_metadata}, metadata={"pointID": pointID, **self.readout_metadata},
).dumps(), ).dumps(),

View File

@ -5,7 +5,7 @@ import warnings
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import BECMessage, MessageEndpoints, bec_logger from bec_lib import messages, MessageEndpoints, bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Signal from ophyd import Device, DeviceStatus, OphydObject, PositionerBase, Signal
from ophyd.sim import EnumSignal, SynSignal, _ReadbackSignal, _SetpointSignal from ophyd.sim import EnumSignal, SynSignal, _ReadbackSignal, _SetpointSignal
@ -301,7 +301,7 @@ class SynSLSDetector(Device):
def stage(self) -> List[object]: def stage(self) -> List[object]:
msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) msg = self.device_manager.producer.get(MessageEndpoints.scan_status())
scan_msg = BECMessage.ScanStatusMessage.loads(msg) scan_msg = messages.ScanStatusMessage.loads(msg)
self.metadata = { self.metadata = {
"scanID": scan_msg.content["scanID"], "scanID": scan_msg.content["scanID"],
"RID": scan_msg.content["info"]["RID"], "RID": scan_msg.content["info"]["RID"],
@ -316,7 +316,7 @@ class SynSLSDetector(Device):
def unstage(self) -> List[object]: def unstage(self) -> List[object]:
signals = {"config": self.sim_state, "data": self.file_name} signals = {"config": self.sim_state, "data": self.file_name}
msg = BECMessage.DeviceMessage(signals=signals, metadata=self.metadata) msg = messages.DeviceMessage(signals=signals, metadata=self.metadata)
self.device_manager.producer.set_and_publish( self.device_manager.producer.set_and_publish(
MessageEndpoints.device_read(self.name), msg.dumps() MessageEndpoints.device_read(self.name), msg.dumps()
) )
@ -427,10 +427,10 @@ class SynFlyer(Device, PositionerBase):
def produce_data(device, metadata): def produce_data(device, metadata):
buffer_time = 0.2 buffer_time = 0.2
elapsed_time = 0 elapsed_time = 0
bundle = BECMessage.BundleMessage() bundle = messages.BundleMessage()
for ii in range(num_pos): for ii in range(num_pos):
bundle.append( bundle.append(
BECMessage.DeviceMessage( messages.DeviceMessage(
signals={ signals={
self.name: { self.name: {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0}, "flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
@ -447,10 +447,10 @@ class SynFlyer(Device, PositionerBase):
device.device_manager.producer.send( device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps() MessageEndpoints.device_read(device.name), bundle.dumps()
) )
bundle = BECMessage.BundleMessage() bundle = messages.BundleMessage()
device.device_manager.producer.set_and_publish( device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name), MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage( messages.DeviceStatusMessage(
device=device.name, device=device.name,
status=1, status=1,
metadata={"pointID": ii, **metadata}, metadata={"pointID": ii, **metadata},
@ -461,7 +461,7 @@ class SynFlyer(Device, PositionerBase):
) )
device.device_manager.producer.set_and_publish( device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name), MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage( messages.DeviceStatusMessage(
device=device.name, device=device.name,
status=0, status=0,
metadata={"pointID": num_pos, **metadata}, metadata={"pointID": num_pos, **metadata},
@ -531,10 +531,10 @@ class SynFlyerLamNI(Device, PositionerBase):
def produce_data(device, metadata): def produce_data(device, metadata):
buffer_time = 0.2 buffer_time = 0.2
elapsed_time = 0 elapsed_time = 0
bundle = BECMessage.BundleMessage() bundle = messages.BundleMessage()
for ii in range(num_pos): for ii in range(num_pos):
bundle.append( bundle.append(
BECMessage.DeviceMessage( messages.DeviceMessage(
signals={ signals={
"syn_flyer_lamni": { "syn_flyer_lamni": {
"flyer_samx": {"value": positions[ii, 0], "timestamp": 0}, "flyer_samx": {"value": positions[ii, 0], "timestamp": 0},
@ -551,10 +551,10 @@ class SynFlyerLamNI(Device, PositionerBase):
device.device_manager.producer.send( device.device_manager.producer.send(
MessageEndpoints.device_read(device.name), bundle.dumps() MessageEndpoints.device_read(device.name), bundle.dumps()
) )
bundle = BECMessage.BundleMessage() bundle = messages.BundleMessage()
device.device_manager.producer.set_and_publish( device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name), MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage( messages.DeviceStatusMessage(
device=device.name, device=device.name,
status=1, status=1,
metadata={"pointID": ii, **metadata}, metadata={"pointID": ii, **metadata},
@ -565,7 +565,7 @@ class SynFlyerLamNI(Device, PositionerBase):
) )
device.device_manager.producer.set_and_publish( device.device_manager.producer.set_and_publish(
MessageEndpoints.device_status(device.name), MessageEndpoints.device_status(device.name),
BECMessage.DeviceStatusMessage( messages.DeviceStatusMessage(
device=device.name, device=device.name,
status=0, status=0,
metadata={"pointID": num_pos, **metadata}, metadata={"pointID": num_pos, **metadata},

View File

@ -2,7 +2,7 @@ import threading
import time import time
import numpy as np import numpy as np
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib import messages, MessageEndpoints
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, Kind, Signal from ophyd import Device, Kind, Signal
from ophyd.flyers import FlyerInterface from ophyd.flyers import FlyerInterface
@ -355,7 +355,7 @@ class SynXtremeOtfReplay(FlyerInterface, Device):
"timestamp": timestamp, "timestamp": timestamp,
}, },
} }
msg = BECMessage.DeviceMessage( msg = messages.DeviceMessage(
signals=signals, metadata=self._device_manager.devices.otf.metadata signals=signals, metadata=self._device_manager.devices.otf.metadata
).dumps() ).dumps()
self._device_manager.producer.set_and_publish( self._device_manager.producer.set_and_publish(

View File

@ -4,7 +4,7 @@ import time
from typing import List from typing import List
import numpy as np import numpy as np
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait from ophyd.status import wait as status_wait

View File

@ -1,7 +1,7 @@
import time import time
from bec_lib.core import bec_logger from bec_lib import bec_logger
from bec_lib.core.devicemanager import DeviceContainer from bec_lib.devicemanager import DeviceContainer
from ophyd import Signal, Kind from ophyd import Signal, Kind

View File

@ -1,7 +1,7 @@
import functools import functools
import threading import threading
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Device from ophyd import Device
from ophyd.ophydobj import OphydObject from ophyd.ophydobj import OphydObject

View File

@ -6,7 +6,7 @@ import time
import typing import typing
import numpy as np import numpy as np
from bec_lib.core import bec_logger from bec_lib import bec_logger
from ophyd import Signal from ophyd import Signal
from ophyd.utils.errors import DisconnectedError from ophyd.utils.errors import DisconnectedError

View File

@ -4,7 +4,7 @@ from unittest import mock
import ophyd import ophyd
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib import messages, MessageEndpoints
from ophyd_devices.epics.devices.eiger9m_csaxs import Eiger9McSAXS from ophyd_devices.epics.devices.eiger9m_csaxs import Eiger9McSAXS
from tests.utils import DMMock, MockPV from tests.utils import DMMock, MockPV
@ -413,9 +413,9 @@ def test_publish_file_location(mock_det, scaninfo):
mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"]) mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"])
if scaninfo["successful"] is None: if scaninfo["successful"] is None:
msg = BECMessage.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps()
else: else:
msg = BECMessage.FileMessage( msg = messages.FileMessage(
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
).dumps() ).dumps()
expected_calls = [ expected_calls = [

View File

@ -7,7 +7,7 @@ import ophyd
from ophyd_devices.epics.devices.falcon_csaxs import FalconcSAXS, FalconTimeoutError from ophyd_devices.epics.devices.falcon_csaxs import FalconcSAXS, FalconTimeoutError
from tests.utils import DMMock, MockPV from tests.utils import DMMock, MockPV
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib import messages, MessageEndpoints
def patch_dual_pvs(device): def patch_dual_pvs(device):
@ -210,9 +210,9 @@ def test_publish_file_location(mock_det, scaninfo):
mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"]) mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"])
if scaninfo["successful"] is None: if scaninfo["successful"] is None:
msg = BECMessage.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps()
else: else:
msg = BECMessage.FileMessage( msg = messages.FileMessage(
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
).dumps() ).dumps()
expected_calls = [ expected_calls = [

View File

@ -5,7 +5,7 @@ from unittest import mock
import ophyd import ophyd
from bec_lib.core import BECMessage, MessageEndpoints from bec_lib import messages, MessageEndpoints
from ophyd_devices.epics.devices.pilatus_csaxs import PilatuscSAXS from ophyd_devices.epics.devices.pilatus_csaxs import PilatuscSAXS
from tests.utils import DMMock, MockPV from tests.utils import DMMock, MockPV
@ -172,9 +172,9 @@ def test_publish_file_location(mock_det, scaninfo):
mock_det.filepath = scaninfo["filepath"] mock_det.filepath = scaninfo["filepath"]
mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"]) mock_det._publish_file_location(done=scaninfo["done"], successful=scaninfo["successful"])
if scaninfo["successful"] is None: if scaninfo["successful"] is None:
msg = BECMessage.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps()
else: else:
msg = BECMessage.FileMessage( msg = messages.FileMessage(
file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"]
).dumps() ).dumps()
expected_calls = [ expected_calls = [

View File

@ -1,5 +1,5 @@
from bec_lib.core.devicemanager import DeviceContainer from bec_lib.devicemanager import DeviceContainer
from bec_lib.core.tests.utils import ProducerMock from bec_lib.tests.utils import ProducerMock
from unittest import mock from unittest import mock