From b12292246fd9d8204c76cd1f4927da5a1b981857 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Sat, 17 Feb 2024 17:44:52 +0100 Subject: [PATCH] Revert "fix(deprecation): remove all remaining .dumps(), .loads() and producer->connector" This reverts commit 4159f3e3ec20727b395808118f3c0c166d9d1c0c --- ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py | 7 +++++-- ophyd_devices/epics/devices/eiger9m_csaxs.py | 10 +++++----- ophyd_devices/epics/devices/falcon_csaxs.py | 10 +++++----- ophyd_devices/epics/devices/mcs_csaxs.py | 4 ++-- ophyd_devices/epics/devices/pilatus_csaxs.py | 10 +++++----- .../epics/devices/psi_delay_generator_base.py | 2 +- .../epics/devices/psi_detector_base.py | 2 +- ophyd_devices/rt_lamni/rt_lamni_ophyd.py | 16 ++++++++-------- ophyd_devices/sim/sim.py | 18 ++++++++++-------- ophyd_devices/sim/sim_test_devices.py | 18 ++++++++++-------- ophyd_devices/sim/sim_xtreme.py | 4 ++-- ophyd_devices/utils/bec_scaninfo_mixin.py | 4 ++-- ophyd_devices/utils/bec_utils.py | 14 +++++++------- tests/test_eiger9m_csaxs.py | 14 +++++++------- tests/test_falcon_csaxs.py | 12 ++++++------ tests/test_mcs_card.py | 8 ++++---- tests/test_pilatus_csaxs.py | 12 ++++++------ tests/utils.py | 4 ++-- 18 files changed, 88 insertions(+), 81 deletions(-) diff --git a/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py b/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py index 582e88e..256b124 100644 --- a/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py +++ b/ophyd_devices/eiger1p5m_csaxs/eiger1p5m.py @@ -81,7 +81,8 @@ class Eiger1p5MDetector(Device): self.username = "e20588" # TODO get from config def _get_current_scan_msg(self) -> messages.ScanStatusMessage: - return self.device_manager.connector.get(MessageEndpoints.scan_status()) + msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) + return messages.ScanStatusMessage.loads(msg) def _get_scan_dir(self, scan_bundle, scan_number, leading_zeros=None): if leading_zeros is None: @@ -158,7 +159,9 @@ class Eiger1p5MDetector(Device): self.detector_control.put("stop") signals = {"config": self.read(), "data": self.file_name} msg = messages.DeviceMessage(signals=signals, metadata=self.metadata) - self.device_manager.connector.set_and_publish(MessageEndpoints.device_read(self.name), msg) + self.device_manager.producer.set_and_publish( + MessageEndpoints.device_read(self.name), msg.dumps() + ) self._stopped = False return super().unstage() diff --git a/ophyd_devices/epics/devices/eiger9m_csaxs.py b/ophyd_devices/epics/devices/eiger9m_csaxs.py index 9873326..219453f 100644 --- a/ophyd_devices/epics/devices/eiger9m_csaxs.py +++ b/ophyd_devices/epics/devices/eiger9m_csaxs.py @@ -301,20 +301,20 @@ class Eiger9MSetup(CustomDetectorMixin): done (bool): True if scan is finished successful (bool): True if scan was successful """ - pipe = self.parent.connector.pipeline() + pipe = self.parent.producer.pipeline() if successful is None: msg = messages.FileMessage(file_path=self.parent.filepath, done=done) else: msg = messages.FileMessage( file_path=self.parent.filepath, done=done, successful=successful ) - self.parent.connector.set_and_publish( + self.parent.producer.set_and_publish( MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name), - msg, + msg.dumps(), pipe=pipe, ) - self.parent.connector.set_and_publish( - MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe + self.parent.producer.set_and_publish( + MessageEndpoints.file_event(self.parent.name), msg.dumps(), pipe=pipe ) pipe.execute() diff --git a/ophyd_devices/epics/devices/falcon_csaxs.py b/ophyd_devices/epics/devices/falcon_csaxs.py index 09d61ff..fea5173 100644 --- a/ophyd_devices/epics/devices/falcon_csaxs.py +++ b/ophyd_devices/epics/devices/falcon_csaxs.py @@ -244,20 +244,20 @@ class FalconSetup(CustomDetectorMixin): done (bool): True if scan is finished successful (bool): True if scan was successful """ - pipe = self.parent.connector.pipeline() + pipe = self.parent.producer.pipeline() if successful is None: msg = messages.FileMessage(file_path=self.parent.filepath, done=done) else: msg = messages.FileMessage( file_path=self.parent.filepath, done=done, successful=successful ) - self.parent.connector.set_and_publish( + self.parent.producer.set_and_publish( MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name), - msg, + msg.dumps(), pipe=pipe, ) - self.parent.connector.set_and_publish( - MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe + self.parent.producer.set_and_publish( + MessageEndpoints.file_event(self.parent.name), msg.dumps(), pipe=pipe ) pipe.execute() diff --git a/ophyd_devices/epics/devices/mcs_csaxs.py b/ophyd_devices/epics/devices/mcs_csaxs.py index 5158b78..f87b809 100644 --- a/ophyd_devices/epics/devices/mcs_csaxs.py +++ b/ophyd_devices/epics/devices/mcs_csaxs.py @@ -136,8 +136,8 @@ class MCSSetup(CustomDetectorMixin): msg = messages.DeviceMessage( signals=dict(self.mca_data), metadata=self.parent.scaninfo.scan_msg.metadata, - ) - self.parent.connector.xadd( + ).dumps() + self.parent.producer.xadd( topic=MessageEndpoints.device_async_readback( scanID=self.parent.scaninfo.scanID, device=self.parent.name ), diff --git a/ophyd_devices/epics/devices/pilatus_csaxs.py b/ophyd_devices/epics/devices/pilatus_csaxs.py index 390db83..ee13b98 100644 --- a/ophyd_devices/epics/devices/pilatus_csaxs.py +++ b/ophyd_devices/epics/devices/pilatus_csaxs.py @@ -331,7 +331,7 @@ class PilatusSetup(CustomDetectorMixin): done (bool): True if scan is finished successful (bool): True if scan was successful """ - pipe = self.parent.connector.pipeline() + pipe = self.parent.producer.pipeline() if successful is None: msg = messages.FileMessage( file_path=self.parent.filepath, @@ -345,13 +345,13 @@ class PilatusSetup(CustomDetectorMixin): successful=successful, metadata={"input_path": self.parent.filepath_raw}, ) - self.parent.connector.set_and_publish( + self.parent.producer.set_and_publish( MessageEndpoints.public_file(self.parent.scaninfo.scanID, self.parent.name), - msg, + msg.dumps(), pipe=pipe, ) - self.parent.connector.set_and_publish( - MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe + self.parent.producer.set_and_publish( + MessageEndpoints.file_event(self.parent.name), msg.dumps(), pipe=pipe ) pipe.execute() diff --git a/ophyd_devices/epics/devices/psi_delay_generator_base.py b/ophyd_devices/epics/devices/psi_delay_generator_base.py index 5068343..76641c6 100644 --- a/ophyd_devices/epics/devices/psi_delay_generator_base.py +++ b/ophyd_devices/epics/devices/psi_delay_generator_base.py @@ -391,7 +391,7 @@ class PSIDelayGeneratorBase(Device): self.device_manager = device_manager else: self.device_manager = bec_utils.DMMock() - self.connector = self.device_manager.connector + self.producer = self.device_manager.producer self._update_scaninfo() self._init() diff --git a/ophyd_devices/epics/devices/psi_detector_base.py b/ophyd_devices/epics/devices/psi_detector_base.py index 36450d2..176f82c 100644 --- a/ophyd_devices/epics/devices/psi_detector_base.py +++ b/ophyd_devices/epics/devices/psi_detector_base.py @@ -228,7 +228,7 @@ class PSIDetectorBase(Device): self.device_manager = bec_utils.DMMock() base_path = kwargs["basepath"] if "basepath" in kwargs else "~/Data10/" self.service_cfg = {"base_path": os.path.expanduser(base_path)} - self.connector = self.device_manager.connector + self.producer = self.device_manager.producer self._update_scaninfo() self._update_filewriter() self._init() diff --git a/ophyd_devices/rt_lamni/rt_lamni_ophyd.py b/ophyd_devices/rt_lamni/rt_lamni_ophyd.py index 72bbd8a..0e70d64 100644 --- a/ophyd_devices/rt_lamni/rt_lamni_ophyd.py +++ b/ophyd_devices/rt_lamni/rt_lamni_ophyd.py @@ -308,9 +308,9 @@ class RtLamniController(Controller): def _update_flyer_device_info(self): flyer_info = self._get_flyer_device_info() - self.get_device_manager().connector.set( + self.get_device_manager().producer.set( MessageEndpoints.device_info("rt_scan"), - messages.DeviceInfoMessage(device="rt_scan", info=flyer_info), + messages.DeviceInfoMessage(device="rt_scan", info=flyer_info).dumps(), ) def _get_flyer_device_info(self) -> dict: @@ -385,11 +385,11 @@ class RtLamniController(Controller): # if not (mode==2 or mode==3): # error - self.get_device_manager().connector.set_and_publish( + self.get_device_manager().producer.set_and_publish( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=1, metadata=self.readout_metadata - ), + ).dumps(), ) # while scan is running while mode > 0: @@ -420,11 +420,11 @@ class RtLamniController(Controller): signals = self._get_signals_from_table(return_table) self.publish_device_data(signals=signals, pointID=int(return_table[0])) - self.get_device_manager().connector.set_and_publish( + self.get_device_manager().producer.set_and_publish( MessageEndpoints.device_status("rt_scan"), messages.DeviceStatusMessage( device="rt_scan", status=0, metadata=self.readout_metadata - ), + ).dumps(), ) logger.info( @@ -432,12 +432,12 @@ class RtLamniController(Controller): ) def publish_device_data(self, signals, pointID): - self.get_device_manager().connector.send( + self.get_device_manager().producer.send( MessageEndpoints.device_read("rt_lamni"), messages.DeviceMessage( signals=signals, metadata={"pointID": pointID, **self.readout_metadata}, - ), + ).dumps(), ) def feedback_status_angle_lamni(self) -> bool: diff --git a/ophyd_devices/sim/sim.py b/ophyd_devices/sim/sim.py index d390592..6dcae0f 100644 --- a/ophyd_devices/sim/sim.py +++ b/ophyd_devices/sim/sim.py @@ -494,32 +494,34 @@ class SynFlyer(Device, PositionerBase): } }, metadata={"pointID": ii, **metadata}, - ) + ).dumps() ) ttime.sleep(exp_time) elapsed_time += exp_time if elapsed_time > buffer_time: elapsed_time = 0 - device.device_manager.connector.send( - MessageEndpoints.device_read(device.name), bundle + device.device_manager.producer.send( + MessageEndpoints.device_read(device.name), bundle.dumps() ) bundle = messages.BundleMessage() - device.device_manager.connector.set_and_publish( + device.device_manager.producer.set_and_publish( MessageEndpoints.device_status(device.name), messages.DeviceStatusMessage( device=device.name, status=1, metadata={"pointID": ii, **metadata}, - ), + ).dumps(), ) - device.device_manager.connector.send(MessageEndpoints.device_read(device.name), bundle) - device.device_manager.connector.set_and_publish( + device.device_manager.producer.send( + MessageEndpoints.device_read(device.name), bundle.dumps() + ) + device.device_manager.producer.set_and_publish( MessageEndpoints.device_status(device.name), messages.DeviceStatusMessage( device=device.name, status=0, metadata={"pointID": num_pos, **metadata}, - ), + ).dumps(), ) print("done") diff --git a/ophyd_devices/sim/sim_test_devices.py b/ophyd_devices/sim/sim_test_devices.py index 6f3029c..2030b66 100644 --- a/ophyd_devices/sim/sim_test_devices.py +++ b/ophyd_devices/sim/sim_test_devices.py @@ -115,32 +115,34 @@ class SynFlyerLamNI(Device, PositionerBase): } }, metadata={"pointID": ii, **metadata}, - ) + ).dumps() ) ttime.sleep(exp_time) elapsed_time += exp_time if elapsed_time > buffer_time: elapsed_time = 0 - device.device_manager.connector.send( - MessageEndpoints.device_read(device.name), bundle + device.device_manager.producer.send( + MessageEndpoints.device_read(device.name), bundle.dumps() ) bundle = messages.BundleMessage() - device.device_manager.connector.set_and_publish( + device.device_manager.producer.set_and_publish( MessageEndpoints.device_status(device.name), messages.DeviceStatusMessage( device=device.name, status=1, metadata={"pointID": ii, **metadata}, - ), + ).dumps(), ) - device.device_manager.connector.send(MessageEndpoints.device_read(device.name), bundle) - device.device_manager.connector.set_and_publish( + device.device_manager.producer.send( + MessageEndpoints.device_read(device.name), bundle.dumps() + ) + device.device_manager.producer.set_and_publish( MessageEndpoints.device_status(device.name), messages.DeviceStatusMessage( device=device.name, status=0, metadata={"pointID": num_pos, **metadata}, - ), + ).dumps(), ) print("done") diff --git a/ophyd_devices/sim/sim_xtreme.py b/ophyd_devices/sim/sim_xtreme.py index fcd80b5..fea60b5 100644 --- a/ophyd_devices/sim/sim_xtreme.py +++ b/ophyd_devices/sim/sim_xtreme.py @@ -357,8 +357,8 @@ class SynXtremeOtfReplay(FlyerInterface, Device): } msg = messages.DeviceMessage( signals=signals, metadata=self._device_manager.devices.otf.metadata - ) - self._device_manager.connector.set_and_publish( + ).dumps() + self._device_manager.producer.set_and_publish( MessageEndpoints.device_readback("signals"), msg ) diff --git a/ophyd_devices/utils/bec_scaninfo_mixin.py b/ophyd_devices/utils/bec_scaninfo_mixin.py index fbcbc39..64ba158 100644 --- a/ophyd_devices/utils/bec_scaninfo_mixin.py +++ b/ophyd_devices/utils/bec_scaninfo_mixin.py @@ -96,7 +96,7 @@ class BecScaninfoMixin: messages.ScanStatusMessage: messages.ScanStatusMessage object """ if not self.sim_mode: - msg = self.device_manager.connector.get(MessageEndpoints.scan_status()) + msg = self.device_manager.producer.get(MessageEndpoints.scan_status()) if not isinstance(msg, messages.ScanStatusMessage): return None return msg @@ -112,7 +112,7 @@ class BecScaninfoMixin: if self.sim_mode: return getpass.getuser() - msg = self.device_manager.connector.get(MessageEndpoints.account()) + msg = self.device_manager.producer.get(MessageEndpoints.account()) if msg: return msg return getpass.getuser() diff --git a/ophyd_devices/utils/bec_utils.py b/ophyd_devices/utils/bec_utils.py index 756716f..94c9cf7 100644 --- a/ophyd_devices/utils/bec_utils.py +++ b/ophyd_devices/utils/bec_utils.py @@ -122,17 +122,17 @@ class ProducerMock: class PipelineMock: _pipe_buffer = [] - _connector = None + _producer = None - def __init__(self, connector) -> None: - self._connector = connector + def __init__(self, producer) -> None: + self._producer = producer def execute(self): - if not self._connector.store_data: + if not self._producer.store_data: self._pipe_buffer = [] return [] res = [ - getattr(self._connector, method)(*args, **kwargs) + getattr(self._producer, method)(*args, **kwargs) for method, args, kwargs in self._pipe_buffer ] self._pipe_buffer = [] @@ -142,13 +142,13 @@ class PipelineMock: class DMMock: """Mock for DeviceManager - The mocked DeviceManager creates a device containert and a connector. + The mocked DeviceManager creates a device containert and a producer. """ def __init__(self): self.devices = DeviceContainer() - self.connector = ProducerMock() + self.producer = ProducerMock() def add_device(self, name: str, value: float = 0.0): self.devices[name] = DeviceMock(name, value) diff --git a/tests/test_eiger9m_csaxs.py b/tests/test_eiger9m_csaxs.py index cf25ec3..00ffd80 100644 --- a/tests/test_eiger9m_csaxs.py +++ b/tests/test_eiger9m_csaxs.py @@ -27,7 +27,7 @@ def mock_det(): prefix = "X12SA-ES-EIGER9M:" sim_mode = False dm = DMMock() - with mock.patch.object(dm, "connector"): + with mock.patch.object(dm, "producer"): with mock.patch( "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ), mock.patch( @@ -50,7 +50,7 @@ def test_init(): prefix = "X12SA-ES-EIGER9M:" sim_mode = False dm = DMMock() - with mock.patch.object(dm, "connector"): + with mock.patch.object(dm, "producer"): with mock.patch( "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ), mock.patch( @@ -428,24 +428,24 @@ def test_publish_file_location(mock_det, scaninfo): done=scaninfo["done"], successful=scaninfo["successful"] ) if scaninfo["successful"] is None: - msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]) + msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() else: msg = messages.FileMessage( file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] - ) + ).dumps() expected_calls = [ mock.call( MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), msg, - pipe=mock_det.connector.pipeline.return_value, + pipe=mock_det.producer.pipeline.return_value, ), mock.call( MessageEndpoints.file_event(mock_det.name), msg, - pipe=mock_det.connector.pipeline.return_value, + pipe=mock_det.producer.pipeline.return_value, ), ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls + assert mock_det.producer.set_and_publish.call_args_list == expected_calls def test_stop(mock_det): diff --git a/tests/test_falcon_csaxs.py b/tests/test_falcon_csaxs.py index b07915e..2622a68 100644 --- a/tests/test_falcon_csaxs.py +++ b/tests/test_falcon_csaxs.py @@ -27,7 +27,7 @@ def mock_det(): prefix = "X12SA-SITORO:" sim_mode = False dm = DMMock() - with mock.patch.object(dm, "connector"): + with mock.patch.object(dm, "producer"): with mock.patch( "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ) as filemixin, mock.patch( @@ -215,24 +215,24 @@ def test_publish_file_location(mock_det, scaninfo): done=scaninfo["done"], successful=scaninfo["successful"] ) if scaninfo["successful"] is None: - msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]) + msg = messages.FileMessage(file_path=scaninfo["filepath"], done=scaninfo["done"]).dumps() else: msg = messages.FileMessage( file_path=scaninfo["filepath"], done=scaninfo["done"], successful=scaninfo["successful"] - ) + ).dumps() expected_calls = [ mock.call( MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), msg, - pipe=mock_det.connector.pipeline.return_value, + pipe=mock_det.producer.pipeline.return_value, ), mock.call( MessageEndpoints.file_event(mock_det.name), msg, - pipe=mock_det.connector.pipeline.return_value, + pipe=mock_det.producer.pipeline.return_value, ), ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls + assert mock_det.producer.set_and_publish.call_args_list == expected_calls @pytest.mark.parametrize( diff --git a/tests/test_mcs_card.py b/tests/test_mcs_card.py index 841dac6..ac62bc6 100644 --- a/tests/test_mcs_card.py +++ b/tests/test_mcs_card.py @@ -32,7 +32,7 @@ def mock_det(): prefix = "X12SA-MCS:" sim_mode = False dm = DMMock() - with mock.patch.object(dm, "connector"): + with mock.patch.object(dm, "producer"): with mock.patch( "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ) as filemixin, mock.patch( @@ -53,7 +53,7 @@ def test_init(): prefix = "X12SA-ES-EIGER9M:" sim_mode = False dm = DMMock() - with mock.patch.object(dm, "connector"): + with mock.patch.object(dm, "producer"): with mock.patch( "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ), mock.patch( @@ -184,7 +184,7 @@ def test_send_data_to_bec(mock_det, metadata, mca_data): mock_det.custom_prepare._send_data_to_bec() device_metadata = mock_det.scaninfo.scan_msg.metadata metadata.update({"async_update": "append", "num_lines": mock_det.num_lines.get()}) - data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata) + data = messages.DeviceMessage(signals=dict(mca_data), metadata=device_metadata).dumps() calls = mock.call( topic=MessageEndpoints.device_async_readback( scanID=metadata["scanID"], device=mock_det.name @@ -193,7 +193,7 @@ def test_send_data_to_bec(mock_det, metadata, mca_data): expire=1800, ) - assert mock_det.connector.xadd.call_args == calls + assert mock_det.producer.xadd.call_args == calls @pytest.mark.parametrize( diff --git a/tests/test_pilatus_csaxs.py b/tests/test_pilatus_csaxs.py index 642e0e0..81c4629 100644 --- a/tests/test_pilatus_csaxs.py +++ b/tests/test_pilatus_csaxs.py @@ -28,7 +28,7 @@ def mock_det(): prefix = "X12SA-ES-PILATUS300K:" sim_mode = False dm = DMMock() - with mock.patch.object(dm, "connector"): + with mock.patch.object(dm, "producer"): with mock.patch( "ophyd_devices.epics.devices.psi_detector_base.FileWriterMixin" ), mock.patch( @@ -207,27 +207,27 @@ def test_publish_file_location(mock_det, scaninfo): file_path=scaninfo["filepath"], done=scaninfo["done"], metadata={"input_path": scaninfo["filepath_raw"]}, - ) + ).dumps() else: msg = messages.FileMessage( file_path=scaninfo["filepath"], done=scaninfo["done"], metadata={"input_path": scaninfo["filepath_raw"]}, successful=scaninfo["successful"], - ) + ).dumps() expected_calls = [ mock.call( MessageEndpoints.public_file(scaninfo["scanID"], mock_det.name), msg, - pipe=mock_det.connector.pipeline.return_value, + pipe=mock_det.producer.pipeline.return_value, ), mock.call( MessageEndpoints.file_event(mock_det.name), msg, - pipe=mock_det.connector.pipeline.return_value, + pipe=mock_det.producer.pipeline.return_value, ), ] - assert mock_det.connector.set_and_publish.call_args_list == expected_calls + assert mock_det.producer.set_and_publish.call_args_list == expected_calls @pytest.mark.parametrize( diff --git a/tests/utils.py b/tests/utils.py index 8d0fd6b..84ebce8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -296,13 +296,13 @@ class DeviceMock: class DMMock: """Mock for DeviceManager - The mocked DeviceManager creates a device containert and a connector. + The mocked DeviceManager creates a device containert and a producer. """ def __init__(self): self.devices = DeviceContainer() - self.connector = ProducerMock() + self.producer = ProducerMock() def add_device(self, name: str, value: float = 0.0): """Add device to the DeviceManagerMock"""