diff --git a/bec_client/bec_client/callbacks.py b/bec_client/bec_client/callbacks.py index 4cb7b7be..a0d9cbde 100644 --- a/bec_client/bec_client/callbacks.py +++ b/bec_client/bec_client/callbacks.py @@ -3,7 +3,7 @@ import logging import threading import time -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import msgpack import numpy as np from bec_utils import Alarms, DeviceManagerBase, DeviceStatus, MessageEndpoints @@ -75,7 +75,7 @@ async def live_updates_readback( while not all(stop_dev.is_set() for stop_dev in stop): msg = consumer.poll_messages() if msg is not None: - msg = KMessage.DeviceMessage.loads(msg.value).content["signals"] + msg = BMessage.DeviceMessage.loads(msg.value).content["signals"] for ind, dev in enumerate(devices): if dev in msg: dev_values[ind] = msg[dev].get("value") diff --git a/bec_client/bec_client/devicemanager_client.py b/bec_client/bec_client/devicemanager_client.py index 23af1d67..d75cefa0 100644 --- a/bec_client/bec_client/devicemanager_client.py +++ b/bec_client/bec_client/devicemanager_client.py @@ -3,7 +3,7 @@ import logging import time import uuid -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage from bec_utils import Device, DeviceManagerBase, MessageEndpoints from bec_client.callbacks import ScanRequestError @@ -84,7 +84,7 @@ def rpc(fcn): "args": args, "kwargs": kwargs, } - msg = KMessage.ScanQueueMessage( + msg = BMessage.ScanQueueMessage( scan_type="device_rpc", parameter=params, queue="primary", @@ -106,7 +106,7 @@ def rpc(fcn): if msg: break time.sleep(0.1) - msg = KMessage.DeviceRPCMessage.loads(msg) + msg = BMessage.DeviceRPCMessage.loads(msg) return msg.content.get("return_val") return wrapper @@ -167,7 +167,7 @@ class DeviceBase(Device): else: val = self.parent.producer.get(MessageEndpoints.device_read(self.name)) if val: - return KMessage.DeviceMessage.loads(val).content["signals"].get(self.name) + return BMessage.DeviceMessage.loads(val).content["signals"].get(self.name) else: return None @@ -280,8 +280,8 @@ class DMClient(DeviceManagerBase): self.devices._add_device(name, obj) print(time.time() - start) - def _get_device_info(self, device_name) -> KMessage.DeviceInfoMessage: - msg = KMessage.DeviceInfoMessage.loads( + def _get_device_info(self, device_name) -> BMessage.DeviceInfoMessage: + msg = BMessage.DeviceInfoMessage.loads( self.producer.get(MessageEndpoints.device_info(device_name)) ) return msg @@ -292,7 +292,7 @@ class DMClient(DeviceManagerBase): msg = self._get_device_info(dev.get("name")) self._add_device(dev, msg) - def _add_device(self, dev: dict, msg: KMessage.DeviceInfoMessage): + def _add_device(self, dev: dict, msg: BMessage.DeviceInfoMessage): name = msg.content["device"] info = msg.content["info"] diff --git a/bec_client/bec_client/scan_queue.py b/bec_client/bec_client/scan_queue.py index 3447d03f..8c535371 100644 --- a/bec_client/bec_client/scan_queue.py +++ b/bec_client/bec_client/scan_queue.py @@ -2,7 +2,7 @@ import logging import time from collections import deque -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage from bec_utils import MessageEndpoints logger = logging.getLogger("scan_queue") @@ -18,24 +18,24 @@ class ScanRequest: self.status = "pending" self.scanID = None # request.metadata.get("scanID") - def update_with_response(self, response: KMessage.RequestResponseMessage): + def update_with_response(self, response: BMessage.RequestResponseMessage): self.response = response self.decision_pending = False self.request_id = response.metadata["RID"] self.accepted = [response.content["decision"] == "accepted"] - def update_with_request(self, request: KMessage.ScanQueueMessage): + def update_with_request(self, request: BMessage.ScanQueueMessage): self.request = request self.request_id = request.metadata["RID"] @classmethod - def from_request(cls, request: KMessage.ScanQueueMessage): + def from_request(cls, request: BMessage.ScanQueueMessage): scan_req = cls() scan_req.update_with_request(request=request) return scan_req @classmethod - def from_response(cls, response: KMessage.RequestResponseMessage): + def from_response(cls, response: BMessage.RequestResponseMessage): scan_req = cls() scan_req.update_with_response(response) return scan_req @@ -208,7 +208,7 @@ class ScanQueue: action = "deferred_pause" if deferred_pause else "pause" self.parent.producer.send( MessageEndpoints.scan_queue_modification_request(), - KMessage.ScanQueueModificationMessage( + BMessage.ScanQueueModificationMessage( scanID=scanID, action=action, parameter={} ).dumps(), ) @@ -218,7 +218,7 @@ class ScanQueue: scanID = self.current_scanId self.parent.producer.send( MessageEndpoints.scan_queue_modification_request(), - KMessage.ScanQueueModificationMessage( + BMessage.ScanQueueModificationMessage( scanID=scanID, action="abort", parameter={} ).dumps(), ) @@ -228,7 +228,7 @@ class ScanQueue: scanID = self.current_scanId self.parent.producer.send( MessageEndpoints.scan_queue_modification_request(), - KMessage.ScanQueueModificationMessage( + BMessage.ScanQueueModificationMessage( scanID=scanID, action="continue", parameter={} ).dumps(), ) @@ -244,13 +244,13 @@ class ScanQueue: @staticmethod def _scan_segment_callback(msg, *, parent, **kwargs) -> None: - scan_msg = KMessage.ScanMessage.loads(msg.value) + scan_msg = BMessage.ScanMessage.loads(msg.value) if scan_msg is not None: parent.add_scan_msg(scan_msg) @staticmethod def _scan_queue_status_callback(msg, *, parent, **kwargs) -> None: - queue_status = KMessage.ScanQueueStatusMessage.loads(msg.value) + queue_status = BMessage.ScanQueueStatusMessage.loads(msg.value) if queue_status is not None: parent._update_queue_status(queue_status.content["queue"]) # if scan.metadata is not None: @@ -307,7 +307,7 @@ class ScanQueue: @staticmethod def _scan_queue_request_callback(msg, *, parent, **kwargs) -> None: - request = KMessage.ScanQueueMessage.loads(msg.value) + request = BMessage.ScanQueueMessage.loads(msg.value) if request.metadata is not None: if "RID" in request.metadata: if parent.scan_queue_requests.get(request.metadata["RID"]) is not None: @@ -319,7 +319,7 @@ class ScanQueue: @staticmethod def _scan_queue_request_response_callback(msg, *, parent, **kwargs) -> None: - response = KMessage.RequestResponseMessage.loads(msg.value) + response = BMessage.RequestResponseMessage.loads(msg.value) logger.debug(response) if parent.scan_queue_requests.get(response.metadata.get("RID")) is not None: parent.scan_queue_requests[response.metadata["RID"]].update_with_response(response) @@ -334,7 +334,7 @@ class ScanQueue: @staticmethod def _scan_status_callback(msg, *, parent, **kwargs) -> None: - scan = KMessage.ScanStatusMessage.loads(msg.value) + scan = BMessage.ScanStatusMessage.loads(msg.value) scan_number = scan.content["info"].get("scan_number") if scan_number: parent.last_scan_number = scan_number diff --git a/bec_client/bec_client/scans.py b/bec_client/bec_client/scans.py index 5e1fc0db..20ad7ee3 100644 --- a/bec_client/bec_client/scans.py +++ b/bec_client/bec_client/scans.py @@ -4,7 +4,7 @@ import uuid from contextlib import ContextDecorator import msgpack -from bec_utils import BECMessage as KMessage +from bec_utils import BECMessage as BMessage from bec_utils import MessageEndpoints from bec_utils.connector import ConsumerConnector from cytoolz import partition @@ -75,7 +75,7 @@ class ScanObject: else: return self.scan_info.get("scan_report_hint") - def _start_consumer(self, request: KMessage.ScanQueueMessage) -> ConsumerConnector: + def _start_consumer(self, request: BMessage.ScanQueueMessage) -> ConsumerConnector: consumer = self.parent.devicemanager.connector.consumer( [ MessageEndpoints.device_readback(dev) @@ -86,7 +86,7 @@ class ScanObject: ) return consumer - def _send_scan_request(self, request: KMessage.ScanQueueMessage) -> None: + def _send_scan_request(self, request: BMessage.ScanQueueMessage) -> None: self.parent.devicemanager.producer.send( MessageEndpoints.scan_queue_request(), request.dumps() ) @@ -140,7 +140,7 @@ class Scans: @staticmethod def _prepare_scan_request( scan_name: str, scan_info: dict, *args, **kwargs - ) -> KMessage.ScanQueueMessage: + ) -> BMessage.ScanQueueMessage: """Prepare scan request message with given scan arguments Args: @@ -153,7 +153,7 @@ class Scans: TypeError: Raised if an argument is not of the required type as specified in scan_info. Returns: - KMessage.ScanQueueMessage: _description_ + BMessage.ScanQueueMessage: _description_ """ arg_input = scan_info.get("arg_input") if arg_input is not None: @@ -182,7 +182,7 @@ class Scans: "args": Scans._parameter_bundler(args, arg_bundle_size), "kwargs": kwargs, } - return KMessage.ScanQueueMessage( + return BMessage.ScanQueueMessage( scan_type=scan_name, parameter=params, queue="primary", metadata=md ) diff --git a/koss/koss/bkqueue.py b/koss/koss/bkqueue.py index 22b0d6bc..83a9561e 100644 --- a/koss/koss/bkqueue.py +++ b/koss/koss/bkqueue.py @@ -7,7 +7,7 @@ import uuid from enum import Enum from typing import Union -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage from bec_utils import Alarms, MessageEndpoints from koss.scan_assembler import ScanAssembler @@ -31,7 +31,7 @@ class QueueManager: self.queues = {"primary": ScanQueue(self)} self._start_scan_queue_consumer() - def add_to_queue(self, scan_queue: str, msg: KMessage.ScanQueueMessage) -> None: + def add_to_queue(self, scan_queue: str, msg: BMessage.ScanQueueMessage) -> None: try: self.queues[scan_queue].append(msg) except LimitError as limit_error: @@ -58,7 +58,7 @@ class QueueManager: @staticmethod def _scan_queue_callback(msg, parent, **_kwargs) -> None: - scan_msg = KMessage.ScanQueueMessage.loads(msg.value) + scan_msg = BMessage.ScanQueueMessage.loads(msg.value) print("Receiving scan:", scan_msg.content, time.time()) # instructions = parent.scan_assembler.assemble_device_instructions(scan_msg) parent.add_to_queue("primary", scan_msg) @@ -66,13 +66,13 @@ class QueueManager: @staticmethod def _scan_queue_modification_callback(msg, parent, **_kwargs): - scan_mod_msg = KMessage.ScanQueueModificationMessage.loads(msg.value) + scan_mod_msg = BMessage.ScanQueueModificationMessage.loads(msg.value) print("Receiving scan modification:", scan_mod_msg.content) if scan_mod_msg: parent.scan_interception(scan_mod_msg) parent.send_queue_status() - def scan_interception(self, scan_mod_msg: KMessage.ScanQueueModificationMessage) -> None: + def scan_interception(self, scan_mod_msg: BMessage.ScanQueueModificationMessage) -> None: action = scan_mod_msg.content["action"] self.__getattribute__("_set_" + action)(scanID=scan_mod_msg.content["scanID"]) @@ -120,7 +120,7 @@ class QueueManager: queue_export[k] = {"info": queue_info, "status": scan_queue.status.name} self.producer.send( MessageEndpoints.scan_queue_status(), - KMessage.ScanQueueStatusMessage(queue=queue_export).dumps(), + BMessage.ScanQueueStatusMessage(queue=queue_export).dumps(), ) def shutdown(self): diff --git a/koss/koss/scan_assembler.py b/koss/koss/scan_assembler.py index 60da60dc..9abd13ba 100644 --- a/koss/koss/scan_assembler.py +++ b/koss/koss/scan_assembler.py @@ -1,4 +1,4 @@ -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import koss.scans as ks @@ -18,7 +18,7 @@ class ScanAssembler: def _load_scans(self): self._scans = self.parent.scan_dict - def _unpack_scan(self, msg: KMessage.ScanQueueMessage): + def _unpack_scan(self, msg: BMessage.ScanQueueMessage): scan = msg.content.get("scan_type") scan_cls = getattr(ks, self._scans[scan]["class"]) diff --git a/koss/koss/scan_guard.py b/koss/koss/scan_guard.py index af2f7b15..027ae5f8 100644 --- a/koss/koss/scan_guard.py +++ b/koss/koss/scan_guard.py @@ -1,4 +1,4 @@ -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import msgpack from bec_utils import MessageEndpoints @@ -100,7 +100,7 @@ class ScanGuard: def _scan_queue_request_callback(msg, parent, **kwargs): print( "Receiving scan request:", - KMessage.ScanQueueMessage.loads(msg.value).content, + BMessage.ScanQueueMessage.loads(msg.value).content, ) # pylint: disable=protected-access parent._handle_scan_request(msg.value) @@ -109,7 +109,7 @@ class ScanGuard: def _scan_queue_modification_request_callback(msg, parent, **kwargs): print( "Receiving scan modification request:", - KMessage.ScanQueueModificationMessage.loads(msg.value).content, + BMessage.ScanQueueModificationMessage.loads(msg.value).content, ) # pylint: disable=protected-access parent._handle_scan_modification_request(msg.value) @@ -118,7 +118,7 @@ class ScanGuard: decision = "accepted" if scan_request_decision["accepted"] else "rejected" self.dm.producer.send( MessageEndpoints.scan_queue_request_response(), - KMessage.RequestResponseMessage( + BMessage.RequestResponseMessage( decision=decision, message=scan_request_decision["message"], metadata=metadata, @@ -135,7 +135,7 @@ class ScanGuard: Returns: """ - msg = KMessage.ScanQueueMessage.loads(msg) + msg = BMessage.ScanQueueMessage.loads(msg) scan_request_decision = self._is_valid_scan_request(msg) accepted = scan_request_decision.get("accepted") @@ -156,7 +156,7 @@ class ScanGuard: Returns: """ - msg = KMessage.ScanQueueModificationMessage.loads(msg) + msg = BMessage.ScanQueueModificationMessage.loads(msg) self.dm.producer.send(MessageEndpoints.scan_queue_modification(), msg.dumps()) def _append_to_scan_queue(self, msg): diff --git a/koss/koss/scan_worker.py b/koss/koss/scan_worker.py index 37ed670e..864a35e0 100644 --- a/koss/koss/scan_worker.py +++ b/koss/koss/scan_worker.py @@ -4,12 +4,12 @@ import threading import time from enum import Enum -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import msgpack from bec_utils import DeviceStatus, MessageEndpoints -DeviceMsg = KMessage.DeviceInstructionMessage -ScanStatusMsg = KMessage.ScanStatusMessage +DeviceMsg = BMessage.DeviceInstructionMessage +ScanStatusMsg = BMessage.ScanStatusMessage class InstructionQueueStatus(Enum): diff --git a/koss/koss/scans.py b/koss/koss/scans.py index 32b625f9..ce18ea8d 100644 --- a/koss/koss/scans.py +++ b/koss/koss/scans.py @@ -1,13 +1,13 @@ import enum from abc import ABC, abstractmethod -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import numpy as np from bec_utils import DeviceManagerBase from cytoolz import partition -DeviceMsg = KMessage.DeviceInstructionMessage -ScanMsg = KMessage.ScanQueueMessage +DeviceMsg = BMessage.DeviceInstructionMessage +ScanMsg = BMessage.ScanQueueMessage class LimitError(Exception): diff --git a/koss/tests/test_scan_guard.py b/koss/tests/test_scan_guard.py index dd4fa413..da0a7add 100644 --- a/koss/tests/test_scan_guard.py +++ b/koss/tests/test_scan_guard.py @@ -1,5 +1,5 @@ import pytest -from bec_utils import BECMessage as KMessage +from bec_utils import BECMessage as BMessage from koss.devicemanager import DeviceManagerKOSS from koss.scan_guard import ScanGuard @@ -16,7 +16,7 @@ def test_check_motors_movable(): sg = ScanGuard(parent=k) sg._check_motors_movable( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="fermat_scan", parameter={ "args": {"samx": (-5, 5), "samy": (-5, 5)}, @@ -32,7 +32,7 @@ def test_check_motors_movable(): dm._load_config_device() sg._check_motors_movable( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="fermat_scan", parameter={ "args": {"samx": (-5, 5), "samy": (-5, 5)}, diff --git a/koss/tests/test_scans.py b/koss/tests/test_scans.py index 673ebed2..19fa4518 100644 --- a/koss/tests/test_scans.py +++ b/koss/tests/test_scans.py @@ -1,6 +1,6 @@ import numpy as np import pytest -from bec_utils import BECMessage as KMessage +from bec_utils import BECMessage as BMessage from koss.scans import FermatSpiralScan, Move, Scan @@ -32,31 +32,31 @@ class DMMock: "mv_msg,reference_msg_list", [ ( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="mv", parameter={"args": {"samx": (1,), "samy": (2,)}, "kwargs": {}}, queue="primary", ), [ - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={"value": 1, "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 0}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samy", action="set", parameter={"value": 2, "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 1}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="wait", parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 2}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samy", action="wait", parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"}, @@ -65,7 +65,7 @@ class DMMock: ], ), ( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="mv", parameter={ "args": {"samx": (1,), "samy": (2,), "samz": (3,)}, @@ -74,37 +74,37 @@ class DMMock: queue="primary", ), [ - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={"value": 1, "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 0}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samy", action="set", parameter={"value": 2, "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 1}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samz", action="set", parameter={"value": 3, "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 2}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="wait", parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 3}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samy", action="wait", parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 4}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samz", action="wait", parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"}, @@ -113,19 +113,19 @@ class DMMock: ], ), ( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="mv", parameter={"args": {"samx": (1,)}, "kwargs": {}}, queue="primary", ), [ - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={"value": 1, "group": "scan_motor", "wait_group": "scan_motor"}, metadata={"stream": "primary", "DIID": 0}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="wait", parameter={"type": "move", "group": "scan_motor", "wait_group": "scan_motor"}, @@ -162,13 +162,13 @@ def test_scan_move(mv_msg, reference_msg_list): "scan_msg,reference_scan_list", [ ( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="grid_scan", parameter={"args": {"samx": (-5, 5, 3)}, "kwargs": {}}, queue="primary", ), [ - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=["samx"], action="read", parameter={ @@ -177,7 +177,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 3}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=["samx"], action="wait", parameter={ @@ -187,25 +187,25 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 4}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="open_scan", parameter={"primary": ["samx"], "num_points": 3, "scan_name": "grid_scan"}, metadata={"stream": "primary", "DIID": 0}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="stage", parameter={}, metadata={"stream": "primary", "DIID": 1}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="baseline_reading", parameter={}, metadata={"stream": "baseline", "DIID": 1}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={ @@ -215,7 +215,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 1}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -225,19 +225,19 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 2}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="trigger", parameter={"group": "trigger"}, metadata={"pointID": 0, "stream": "primary", "DIID": 3}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={"type": "trigger", "time": 0.1}, metadata={"stream": "primary", "DIID": 4}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="read", parameter={ @@ -247,7 +247,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"pointID": 0, "stream": "primary", "DIID": 5}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -257,7 +257,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 6}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={ @@ -267,7 +267,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 7}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -277,7 +277,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 8}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -287,19 +287,19 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 9}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="trigger", parameter={"group": "trigger"}, metadata={"pointID": 1, "stream": "primary", "DIID": 10}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={"type": "trigger", "time": 0.1}, metadata={"stream": "primary", "DIID": 11}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="read", parameter={ @@ -309,7 +309,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"pointID": 1, "stream": "primary", "DIID": 12}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -319,7 +319,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 13}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={ @@ -329,7 +329,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 14}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -339,7 +339,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 15}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -349,13 +349,13 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 16}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="trigger", parameter={"group": "trigger"}, metadata={"pointID": 2, "stream": "primary", "DIID": 17}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -364,7 +364,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 18}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="read", parameter={ @@ -374,7 +374,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"pointID": 2, "stream": "primary", "DIID": 19}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -384,7 +384,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 20}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device="samx", action="set", parameter={ @@ -394,7 +394,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 21}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -404,7 +404,7 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 22}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="wait", parameter={ @@ -414,13 +414,13 @@ def test_scan_move(mv_msg, reference_msg_list): }, metadata={"stream": "primary", "DIID": 23}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="unstage", parameter={}, metadata={"stream": "primary", "DIID": 24}, ), - KMessage.DeviceInstructionMessage( + BMessage.DeviceInstructionMessage( device=None, action="close_scan", parameter={}, @@ -449,7 +449,7 @@ def test_scan_scan(scan_msg, reference_scan_list): "scan_msg,reference_scan_list", [ ( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="fermat_scan", parameter={ "args": {"samx": (-5, 5), "samy": (-5, 5)}, @@ -471,7 +471,7 @@ def test_scan_scan(scan_msg, reference_scan_list): ], ), ( - KMessage.ScanQueueMessage( + BMessage.ScanQueueMessage( scan_type="fermat_scan", parameter={ "args": {"samx": (-5, 5), "samy": (-5, 5)}, diff --git a/opaas/opaas/devices/devicemanageropaas.py b/opaas/opaas/devices/devicemanageropaas.py index 69cce9cf..4fcbf06a 100644 --- a/opaas/opaas/devices/devicemanageropaas.py +++ b/opaas/opaas/devices/devicemanageropaas.py @@ -1,7 +1,7 @@ import logging import time -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import msgpack import ophyd import ophyd.sim as ops @@ -21,7 +21,7 @@ class OPAASDevice(Device): self.metadata = {} def initialize_device_buffer(self, producer): - dev_msg = KMessage.DeviceMessage(signals=self.obj.read(), metadata={}).dumps() + dev_msg = BMessage.DeviceMessage(signals=self.obj.read(), metadata={}).dumps() pipe = producer.pipeline() producer.set_and_publish(MessageEndpoints.device_readback(self.name), dev_msg, pipe=pipe) producer.set(topic=MessageEndpoints.device_read(self.name), msg=dev_msg, pipe=pipe) @@ -114,7 +114,7 @@ class DeviceManagerOPAAS(DeviceManagerBase): interface = get_device_info(obj, {}) self.producer.set( MessageEndpoints.device_info(obj.name), - KMessage.DeviceInfoMessage(device=obj.name, info=interface).dumps(), + BMessage.DeviceInfoMessage(device=obj.name, info=interface).dumps(), ) def reset_device_data(self, obj) -> None: @@ -124,14 +124,14 @@ class DeviceManagerOPAAS(DeviceManagerBase): self.producer.r.delete(MessageEndpoints.device_info(obj.name)) def _obj_callback_readback(self, *args, **kwargs): - # print(KMessage.DeviceMessage(signals=kwargs["obj"].read()).content) + # print(BMessage.DeviceMessage(signals=kwargs["obj"].read()).content) # start = time.time() obj = kwargs["obj"] if obj.connected: name = kwargs["obj"].root.name signals = kwargs["obj"].read() metadata = self.devices.get(kwargs["obj"].root.name).metadata - dev_msg = KMessage.DeviceMessage(signals=signals, metadata=metadata).dumps() + dev_msg = BMessage.DeviceMessage(signals=signals, metadata=metadata).dumps() pipe = self.producer.pipeline() self.producer.set_and_publish(MessageEndpoints.device_readback(name), dev_msg, pipe) pipe.execute() diff --git a/opaas/opaas/opaas.py b/opaas/opaas/opaas.py index 803af06c..bd71b64c 100644 --- a/opaas/opaas/opaas.py +++ b/opaas/opaas/opaas.py @@ -6,7 +6,7 @@ import time from functools import reduce from io import StringIO -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import msgpack import ophyd from bec_utils import Alarms, MessageEndpoints @@ -93,7 +93,7 @@ class OPAAS: @staticmethod def consumer_interception_callback(msg, *, parent, **kwargs) -> None: - mvalue = KMessage.ScanQueueModificationMessage.loads(msg.value) + mvalue = BMessage.ScanQueueModificationMessage.loads(msg.value) logger.info("Receiving: %s", mvalue.content) if mvalue.content.get("action") == "deferred_pause": pass @@ -108,7 +108,7 @@ class OPAAS: @staticmethod def instructions_callback(msg, *, parent, **kwargs) -> None: - instructions = KMessage.DeviceInstructionMessage.loads(msg.value) + instructions = BMessage.DeviceInstructionMessage.loads(msg.value) if instructions.content["device"] is not None: # pylint: disable=protected-access parent._update_device_metadata(instructions) @@ -177,7 +177,7 @@ class OPAAS: # send result to client self.producer.set( MessageEndpoints.device_rpc(instr_params.get("rpc_id")), - KMessage.DeviceRPCMessage( + BMessage.DeviceRPCMessage( device=instr.content["device"], return_val=res, out=result.getvalue(), @@ -192,7 +192,7 @@ class OPAAS: # send error to client self.producer.set( MessageEndpoints.device_rpc(instr_params.get("rpc_id")), - KMessage.DeviceRPCMessage( + BMessage.DeviceRPCMessage( device=instr.content["device"], return_val=None, out={"error": exc.__class__.__name__, "msg": exc.args}, @@ -220,7 +220,7 @@ class OPAAS: metadata = self.device_manager.devices.get(dev).metadata self.producer.set_and_publish( MessageEndpoints.device_read(dev), - KMessage.DeviceMessage(signals=signals, metadata=metadata).dumps(), + BMessage.DeviceMessage(signals=signals, metadata=metadata).dumps(), pipe, ) status_info = metadata diff --git a/scan_bundler/scan_bundler/scan_bundler.py b/scan_bundler/scan_bundler/scan_bundler.py index 7bab341a..84a2259c 100644 --- a/scan_bundler/scan_bundler/scan_bundler.py +++ b/scan_bundler/scan_bundler/scan_bundler.py @@ -3,7 +3,7 @@ import time import uuid from collections.abc import Iterable -import bec_utils.BECMessage as KMessage +import bec_utils.BECMessage as BMessage import msgpack import numpy as np from bec_utils import MessageEndpoints @@ -60,7 +60,7 @@ class ScanBundler: @staticmethod def _device_read_callback(msg, parent, **kwargs): dev = msg.topic.decode().split(MessageEndpoints._device_read + "/")[-1].split(":sub")[0] - msg = KMessage.DeviceMessage.loads(msg.value) + msg = BMessage.DeviceMessage.loads(msg.value) if msg.content["signals"].get(dev) is not None: parent._add_device_to_storage( msg.metadata["scanID"], dev, msg.content["signals"], msg.metadata @@ -70,7 +70,7 @@ class ScanBundler: @staticmethod def _scan_queue_callback(msg, parent, **kwargs): - msg = KMessage.ScanQueueStatusMessage.loads(msg.value) + msg = BMessage.ScanQueueStatusMessage.loads(msg.value) print(msg) for q in msg.content["queue"]["primary"].get("info"): for rb in q.get("request_blocks"): @@ -80,11 +80,11 @@ class ScanBundler: @staticmethod def _scan_status_callback(msg, parent, **kwargs): - msg = KMessage.ScanStatusMessage.loads(msg.value) + msg = BMessage.ScanStatusMessage.loads(msg.value) if msg.content.get("status") != "open": parent._scan_status_modification(msg) - def _scan_status_modification(self, msg: KMessage.ScanStatusMessage): + def _scan_status_modification(self, msg: BMessage.ScanStatusMessage): if msg.content.get("status") == "closed": scanID = msg.content.get("scanID") if scanID: @@ -277,7 +277,7 @@ class ScanBundler: self.producer.send( MessageEndpoints.scan_segment(), - KMessage.ScanMessage( + BMessage.ScanMessage( point_id=pointID, scanID=scanID, data=self.sync_storage[scanID][pointID],