diff --git a/bec_lib/bec_lib/async_data.py b/bec_lib/bec_lib/async_data.py index b78fffa4..6a9ba3b4 100644 --- a/bec_lib/bec_lib/async_data.py +++ b/bec_lib/bec_lib/async_data.py @@ -31,9 +31,9 @@ class AsyncDataHandler: async_data = {} for device_key in async_device_keys: key = device_key.decode() - device_name = key.split(MessageEndpoints.device_async_readback(scan_id, ""))[-1].split( - ":" - )[0] + device_name = key.split(MessageEndpoints.device_async_readback(scan_id, "").endpoint)[ + -1 + ].split(":")[0] data = self.get_async_data_for_device(scan_id, device_name) if not data: continue diff --git a/bec_lib/bec_lib/bec_service.py b/bec_lib/bec_lib/bec_service.py index 4bf96184..fac160ba 100644 --- a/bec_lib/bec_lib/bec_service.py +++ b/bec_lib/bec_lib/bec_service.py @@ -109,7 +109,7 @@ class BECService: ) def _update_existing_services(self): - service_keys = self.connector.keys(MessageEndpoints.service_status("*")) + service_keys = self.connector.keys(MessageEndpoints.service_status("*").endpoint) if not service_keys: return services = [service.decode().split(":", maxsplit=1)[0] for service in service_keys] diff --git a/bec_lib/bec_lib/device.py b/bec_lib/bec_lib/device.py index fcc7a8fd..9b18dc4c 100644 --- a/bec_lib/bec_lib/device.py +++ b/bec_lib/bec_lib/device.py @@ -92,7 +92,7 @@ class Status: while True: request_status = self._connector.lrange( - MessageEndpoints.device_req_status(self._RID), 0, -1 + MessageEndpoints.device_req_status_container(self._RID), 0, -1 ) if request_status: break diff --git a/bec_lib/bec_lib/endpoints.py b/bec_lib/bec_lib/endpoints.py index 80e7d98d..b26860b7 100644 --- a/bec_lib/bec_lib/endpoints.py +++ b/bec_lib/bec_lib/endpoints.py @@ -1,103 +1,48 @@ +""" +Endpoints for communication within the BEC. +""" + # pylint: disable=too-many-public-methods -from string import Template +import enum +from dataclasses import dataclass + +from bec_lib import messages + + +class MessageOp(list[str], enum.Enum): + """Message operation enum""" + + SET_PUBLISH = ["register", "set_and_publish", "delete", "get", "keys"] + SEND = ["send", "register"] + STREAM = ["xadd", "xrange", "xread", "register_stream", "keys"] + LIST = ["lpush", "lrange", "rpush", "ltrim", "keys"] + SET = ["set", "get", "delete", "keys"] + + +@dataclass +class EndpointInfo: + """ + Dataclass for endpoint info. + + Args: + endpoint (str): Endpoint. + message_type (messages.BECMessage): Message type. + message_op (MessageOp): Message operation. + """ + + endpoint: str + message_type: messages.BECMessage + message_op: MessageOp class MessageEndpoints: - # devices feedback - _device_status = "internal/devices/status" - _device_read = "internal/devices/read" - _device_read_configuration = "internal/devices/read_configuration" - _device_readback = "internal/devices/readback" - _device_limits = "internal/devices/limits" - _device_req_status = "internal/devices/req_status" - _device_progress = "internal/devices/progress" - _device_async_readback = Template("internal/devices/async_readback/$scanID/$device") - - # device config - _device_config_request = "internal/devices/config_request" - _device_config_request_response = "internal/devices/config_request_response" - _device_server_config_update = "internal/devices/device_server_config_update" - _device_config_update = "internal/devices/config_update" - _device_config = "internal/devices/config" - _device_config_history = "internal/devices/config_history" - _device_info = "internal/devices/info" - _device_staged = "internal/devices/staged" - - # device monitoring - _device_monitor = "internal/devices/monitor" - - # scan queue - _scan_queue_modification = "internal/queue/queue_modification" - _scan_queue_modification_request = "internal/queue/queue_modification_request" - _scan_queue_insert = "internal/queue/queue_insert" - _scan_queue_request = "internal/queue/queue_request" - _scan_queue_request_response = "internal/queue/queue_request_response" - _scan_queue_status = "internal/queue/queue_status" - _scan_queue_history = "internal/queue/queue_history" - - # scan info - _scan_number = "scans/scan_number" - _dataset_number = "scans/dataset_number" - _scan_status = "scans/scan_status" - _scan_status_list = "scans/scan_status_list" - _available_scans = "scans/available_scans" - _scan_segment = "scans/scan_segment" - _scan_baseline = "scans/scan_baseline" - _bluesky_events = "scans/bluesky-events" - _public_scan_info = Template("public/$scanID/scan_info") - _public_scan_segment = Template("public/$scanID/scan_segment/$pointID") - _public_scan_baseline = Template("public/$scanID/scan_baseline") - _public_file = Template("public/$scanID/file/$name") - _file_event = "public/file_event" - _file_content = "internal/file_content" - - # instructions - _device_instructions = "internal/devices/instructions" - _device_rpc = "internal/devices/rpc" - _pre_scan_macros = "internal/pre_scan_macros" - _post_scan_macros = "internal/post_scan_macros" - - # log - _log = "internal/log" - _alarms = "internal/alarms" - - # service - _services_status = "internal/services/status" - _metrics = "internal/services/metrics" - _service_response = "internal/services/response" - - # misc - _public_global_vars = "public/vars" - _observer = "internal/observer" - _progress = "public/progress" - - # logbook - _logbook = "internal/logbook" - - # scibec - _scibec = "internal/scibec" - - # experiment - _account = "internal/account" - - # data processing - _processed_data = "public/processed_data" - _dap_config = "internal/dap/config" - _available_dap_plugins = "internal/dap/available_plugins" - _dap_request = "internal/dap/request" - _dap_response = "internal/dap/response" - - # GUI - _gui_config = "public/gui/config" - _gui_data = "public/gui/data" - _gui_instructions = "public/gui/instruction" - _gui_instruction_response = "public/gui/instruction_response" - - ########## + """ + Class for message endpoints. + """ # devices feedback - @classmethod - def device_status(cls, device: str) -> str: + @staticmethod + def device_status(device: str) -> EndpointInfo: """ Endpoint for device status. This endpoint is used by the device server to publish the device status using a messages.DeviceStatusMessage message. @@ -105,10 +50,13 @@ class MessageEndpoints: Args: device (str): Device name, e.g. "samx". """ - return f"{cls._device_status}/{device}" + endpoint = f"internal/devices/status/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceStatusMessage, message_op=MessageOp.SET + ) - @classmethod - def device_read(cls, device: str) -> str: + @staticmethod + def device_read(device: str) -> EndpointInfo: """ Endpoint for device readings. This endpoint is used by the device server to publish the device readings using a messages.DeviceMessage message. @@ -117,12 +65,15 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device readings of the specified device. + EndpointInfo: Endpoint for device readings of the specified device. """ - return f"{cls._device_read}/{device}" + endpoint = f"internal/devices/read/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def device_read_configuration(cls, device: str) -> str: + @staticmethod + def device_read_configuration(device: str) -> EndpointInfo: """ Endpoint for device configuration readings. This endpoint is used by the device server to publish the device configuration readings using a messages.DeviceMessage message. @@ -131,12 +82,15 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device configuration readings of the specified device. + EndpointInfo: Endpoint for device configuration readings of the specified device. """ - return f"{cls._device_read_configuration}/{device}" + endpoint = f"internal/devices/read_configuration/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def device_readback(cls, device: str) -> str: + @staticmethod + def device_readback(device: str) -> EndpointInfo: """ Endpoint for device readbacks. This endpoint is used by the device server to publish the device readbacks using a messages.DeviceMessage message. @@ -145,12 +99,15 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device readbacks of the specified device. + EndpointInfo: Endpoint for device readbacks of the specified device. """ - return f"{cls._device_readback}/{device}" + endpoint = f"internal/devices/readback/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def device_limits(cls, device: str) -> str: + @staticmethod + def device_limits(device: str) -> EndpointInfo: """ Endpoint for device limits. This endpoint is used by the device server to publish the device limits using a messages.DeviceMessage message. @@ -159,12 +116,15 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device limits of the specified device. + EndpointInfo: Endpoint for device limits of the specified device. """ - return f"{cls._device_limits}/{device}" + endpoint = f"internal/devices/limits/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def device_req_status(cls, device: str) -> str: + @staticmethod + def device_req_status(device: str) -> EndpointInfo: """ Endpoint for device request status. This endpoint is used by the device server to publish the device request status using a messages.DeviceReqStatusMessage message. @@ -173,12 +133,36 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device request status of the specified device. + EndpointInfo: Endpoint for device request status of the specified device. """ - return f"{cls._device_req_status}/{device}" + endpoint = f"internal/devices/req_status/{device}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.DeviceReqStatusMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def device_progress(cls, device: str) -> str: + @staticmethod + def device_req_status_container(RID: str) -> EndpointInfo: + """ + Endpoint for device request status container. This endpoint is used by the device server to publish + the device request status using a messages.DeviceReqStatusMessage message. + + Args: + RID (str): Request ID. + + Returns: + EndpointInfo: Endpoint for device request status container. + """ + endpoint = f"internal/devices/req_status_container/{RID}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.DeviceReqStatusMessage, + message_op=MessageOp.LIST, + ) + + @staticmethod + def device_progress(device: str) -> EndpointInfo: """ Endpoint for device progress. This endpoint is used by the device server to publish the device progress using a messages.ProgressMessage message. @@ -187,25 +171,33 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device progress of the specified device. + EndpointInfo: Endpoint for device progress of the specified device. """ - return f"{cls._device_progress}/{device}" + endpoint = f"internal/devices/progress/{device}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ProgressMessage, + message_op=MessageOp.SET_PUBLISH, + ) # device config - @classmethod - def device_config_request(cls) -> str: + @staticmethod + def device_config_request() -> EndpointInfo: """ Endpoint for device config request. This endpoint can be used to request a modification to the device config. The request is sent using a messages.DeviceConfigMessage message. Returns: - str: Endpoint for device config request. + EndpointInfo: Endpoint for device config request. """ - return cls._device_config_request + endpoint = "internal/devices/config_request" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceConfigMessage, message_op=MessageOp.SEND + ) - @classmethod - def device_config_request_response(cls, RID: str) -> str: + @staticmethod + def device_config_request_response(RID: str) -> EndpointInfo: """ Endpoint for device config request response. This endpoint is used by the device server and scihub connector to inform about whether the device config @@ -216,12 +208,17 @@ class MessageEndpoints: RID (str): Request ID. Returns: - str: Endpoint for device config request response. + EndpointInfo: Endpoint for device config request response. """ - return f"{cls._device_config_request_response}/{RID}" + endpoint = f"internal/devices/config_request_response/{RID}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.RequestResponseMessage, + message_op=MessageOp.SET, + ) - @classmethod - def device_server_config_request(cls) -> str: + @staticmethod + def device_server_config_request() -> EndpointInfo: """ Endpoint for device server config request. This endpoint can be used to request changes to config. Typically used by the scihub connector following a @@ -229,48 +226,62 @@ class MessageEndpoints: The request is sent using a messages.DeviceConfigMessage message. Returns: - str: Endpoint for device server config request. + EndpointInfo: Endpoint for device server config request. """ - return cls._device_server_config_update + endpoint = "internal/devices/device_server_config_update" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceConfigMessage, message_op=MessageOp.SEND + ) - @classmethod - def device_config_update(cls) -> str: + @staticmethod + def device_config_update() -> EndpointInfo: """ Endpoint for device config update. This endpoint is used by the scihub connector to inform about a change to the device config. The update is sent using a messages.DeviceConfigMessage message. Returns: - str: Endpoint for device config update. + EndpointInfo: Endpoint for device config update. """ - return cls._device_config_update + endpoint = "internal/devices/config_update" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceConfigMessage, message_op=MessageOp.SEND + ) - @classmethod - def device_config(cls) -> str: + @staticmethod + def device_config() -> EndpointInfo: """ Endpoint for device config. This endpoint is used by the scihub connector to set the device config. Returns: - str: Endpoint for device config. + EndpointInfo: Endpoint for device config. """ - return cls._device_config + endpoint = "internal/devices/config" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceConfigMessage, message_op=MessageOp.SET + ) - @classmethod - def device_config_history(cls) -> str: + @staticmethod + def device_config_history() -> EndpointInfo: """ Endpoint for device config history. This endpoint is used to keep track of the device config history using a messages.AvailableResourceMessage message. The endpoint is connected to a redis list. Returns: - str: Endpoint for device config history. + EndpointInfo: Endpoint for device config history. """ - return cls._device_config_history + endpoint = "internal/devices/config_history" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.AvailableResourceMessage, + message_op=MessageOp.LIST, + ) - @classmethod - def device_info(cls, device: str) -> str: + @staticmethod + def device_info(device: str) -> EndpointInfo: """ Endpoint for device info. This endpoint is used by the device server to publish the device info using a messages.DeviceInfoMessage message. @@ -279,12 +290,15 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for device info of the specified device. + EndpointInfo: Endpoint for device info of the specified device. """ - return f"{cls._device_info}/{device}" + endpoint = f"internal/devices/info/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceInfoMessage, message_op=MessageOp.SET + ) - @classmethod - def device_staged(cls, device: str) -> str: + @staticmethod + def device_staged(device: str) -> EndpointInfo: """ Endpoint for the device stage status. This endpoint is used by the device server to publish the device stage status using a messages.DeviceStatusMessage message. @@ -295,12 +309,15 @@ class MessageEndpoints: device (str): Device name, e.g. "samx". Returns: - str: Endpoint for the device stage status of the specified device. + EndpointInfo: Endpoint for the device stage status of the specified device. """ - return f"{cls._device_staged}/{device}" + endpoint = f"internal/devices/staged/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceStatusMessage, message_op=MessageOp.SET + ) - @classmethod - def device_async_readback(cls, scanID: str, device: str) -> str: + @staticmethod + def device_async_readback(scanID: str, device: str) -> EndpointInfo: """ Endpoint for receiving an async device readback over Redis streams. This endpoint is used by the device server to publish async device @@ -313,12 +330,15 @@ class MessageEndpoints: device (str): Device name, e.g. "mcs". Returns: - str: Endpoint for the async device readback. + EndpointInfo: Endpoint for device async readback of the specified device. """ - return cls._device_async_readback.substitute(scanID=scanID, device=device) + endpoint = f"internal/devices/async_readback/{scanID}/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceMessage, message_op=MessageOp.STREAM + ) - @classmethod - def device_monitor(cls, device: str) -> str: + @staticmethod + def device_monitor(device: str) -> EndpointInfo: """ Endpoint for device monitoring. This endpoint is used to publish image or wavefrom data from a monitor. An @@ -326,30 +346,37 @@ class MessageEndpoints: forward a subset from the data to the monitoring endpoing for visualization purposes. Details on shape and type of data need to be specified in dtype/dshape of the dev..describe() method. - #TODO: Add here information that it should be a RedisStream? Args: device (str): Device name, e.g. "eiger". Returns: - str: Endpoint for device monitor of the specified device. + EndpointInfo: Endpoint for device monitoring. """ - return f"{cls._device_monitor}/{device}" + endpoint = f"internal/devices/monitor/{device}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceMessage, message_op=MessageOp.STREAM + ) # scan queue - @classmethod - def scan_queue_modification(cls) -> str: + @staticmethod + def scan_queue_modification() -> EndpointInfo: """ Endpoint for scan queue modification. This endpoint is used to publish accepted scan queue modifications using a messages.ScanQueueModificationMessage message. Returns: - str: Endpoint for scan queue modification. + EndpointInfo: Endpoint for scan queue modification. """ - return cls._scan_queue_modification + endpoint = "internal/queue/queue_modification" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanQueueModificationMessage, + message_op=MessageOp.SEND, + ) - @classmethod - def scan_queue_modification_request(cls) -> str: + @staticmethod + def scan_queue_modification_request() -> EndpointInfo: """ Endpoint for scan queue modification request. This endpoint is used to request a scan queue modification using a messages.ScanQueueModificationMessage message. @@ -357,12 +384,17 @@ class MessageEndpoints: endpoint. Returns: - str: Endpoint for scan queue modification request. + EndpointInfo: Endpoint for scan queue modification request. """ - return cls._scan_queue_modification_request + endpoint = "internal/queue/queue_modification_request" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanQueueModificationMessage, + message_op=MessageOp.SEND, + ) - @classmethod - def scan_queue_insert(cls) -> str: + @staticmethod + def scan_queue_insert() -> EndpointInfo: """ Endpoint for scan queue inserts. This endpoint is used to publish accepted scans using a messages.ScanQueueMessage message. @@ -370,60 +402,81 @@ class MessageEndpoints: scan queue. Returns: - str: Endpoint for scan queue inserts. + EndpointInfo: Endpoint for scan queue inserts. """ - return cls._scan_queue_insert + endpoint = "internal/queue/queue_insert" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ScanQueueMessage, message_op=MessageOp.SEND + ) - @classmethod - def scan_queue_request(cls) -> str: + @staticmethod + def scan_queue_request() -> EndpointInfo: """ Endpoint for scan queue request. This endpoint is used to request the new scans. The request is sent using a messages.ScanQueueMessage message. Returns: - str: Endpoint for scan queue request. + EndpointInfo: Endpoint for scan queue request. """ - return cls._scan_queue_request + endpoint = "internal/queue/queue_request" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ScanQueueMessage, message_op=MessageOp.SEND + ) - @classmethod - def scan_queue_request_response(cls) -> str: + @staticmethod + def scan_queue_request_response() -> EndpointInfo: """ Endpoint for scan queue request response. This endpoint is used to publish the information on whether the scan request was accepted or rejected. The response is sent using a messages.RequestResponseMessage message. Returns: - str: Endpoint for scan queue request response. + EndpointInfo: Endpoint for scan queue request response. """ - return cls._scan_queue_request_response + endpoint = "internal/queue/queue_request_response" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.RequestResponseMessage, + message_op=MessageOp.SEND, + ) - @classmethod - def scan_queue_status(cls) -> str: + @staticmethod + def scan_queue_status() -> EndpointInfo: """ Endpoint for scan queue status. This endpoint is used to publish the scan queue status using a messages.ScanQueueStatusMessage message. Returns: - str: Endpoint for scan queue status. + EndpointInfo: Endpoint for scan queue status. """ - return cls._scan_queue_status + endpoint = "internal/queue/queue_status" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanQueueStatusMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def scan_queue_history(cls) -> str: + @staticmethod + def scan_queue_history() -> EndpointInfo: """ Endpoint for scan queue history. This endpoint is used to keep track of the scan queue history using a messages.ScanQueueHistoryMessage message. The endpoint is connected to a redis list. Returns: - str: Endpoint for scan queue history. + EndpointInfo: Endpoint for scan queue history. """ - return cls._scan_queue_history + endpoint = "internal/queue/queue_history" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanQueueHistoryMessage, + message_op=MessageOp.LIST, + ) # scan info - @classmethod - def scan_number(cls) -> str: + @staticmethod + def scan_number() -> str: """ Endpoint for scan number. This endpoint is used to publish the scan number. The scan number is incremented after each scan and set in redis as an integer. @@ -431,10 +484,10 @@ class MessageEndpoints: Returns: str: Endpoint for scan number. """ - return cls._scan_number + return "scans/scan_number" - @classmethod - def dataset_number(cls) -> str: + @staticmethod + def dataset_number() -> str: """ Endpoint for dataset number. This endpoint is used to publish the dataset number. The dataset number is incremented after each dataset and set in redis as an integer. @@ -442,34 +495,42 @@ class MessageEndpoints: Returns: str: Endpoint for dataset number. """ - return cls._dataset_number + return "scans/dataset_number" - @classmethod - def scan_status(cls) -> str: + @staticmethod + def scan_status() -> EndpointInfo: """ Endpoint for scan status. This endpoint is used to publish the scan status using a messages.ScanStatusMessage message. Returns: - str: Endpoint for scan status. + EndpointInfo: Endpoint for scan status. """ - return cls._scan_status + endpoint = "scans/scan_status" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanStatusMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def available_scans(cls) -> str: + @staticmethod + def available_scans() -> EndpointInfo: """ Endpoint for available scans. This endpoint is used to publish the available scans - using a direct msgpack dump of a dictionary containing the available scans. - - #TODO: Change this to a messages.AvailableScans message. + using an AvailableResourceMessage. Returns: - str: Endpoint for available scans. + EndpointInfo: Endpoint for available scans. """ - return cls._available_scans + endpoint = "scans/available_scans" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.AvailableResourceMessage, + message_op=MessageOp.SET, + ) - @classmethod - def bluesky_events(cls) -> str: + @staticmethod + def bluesky_events() -> str: """ Endpoint for bluesky events. This endpoint is used by the scan bundler to publish the bluesky events using a direct msgpack dump of the bluesky event. @@ -477,10 +538,10 @@ class MessageEndpoints: Returns: str: Endpoint for bluesky events. """ - return cls._bluesky_events + return "scans/bluesky-events" - @classmethod - def scan_segment(cls) -> str: + @staticmethod + def scan_segment() -> EndpointInfo: """ Endpoint for scan segment. This endpoint is used by the scan bundler to publish the scan segment using a messages.ScanMessage message. @@ -488,10 +549,13 @@ class MessageEndpoints: Returns: str: Endpoint for scan segments. """ - return cls._scan_segment + endpoint = "scans/scan_segment" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ScanMessage, message_op=MessageOp.SEND + ) - @classmethod - def scan_baseline(cls) -> str: + @staticmethod + def scan_baseline() -> EndpointInfo: """ Endpoint for scan baseline readings. This endpoint is used by the scan bundler to publish the scan baseline readings using a messages.ScanBaselineMessage message. @@ -499,11 +563,16 @@ class MessageEndpoints: Returns: str: Endpoint for scan baseline readings. """ - return cls._scan_baseline + endpoint = "scans/scan_baseline" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ScanBaselineMessage, + message_op=MessageOp.SET_PUBLISH, + ) # instructions - @classmethod - def device_instructions(cls) -> str: + @staticmethod + def device_instructions() -> EndpointInfo: """ Endpoint for device instructions. This endpoint is used by the scan server to publish the device instructions using a messages.DeviceInstructionMessage message. @@ -513,10 +582,15 @@ class MessageEndpoints: Returns: str: Endpoint for device instructions. """ - return cls._device_instructions + endpoint = "internal/devices/instructions" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.DeviceInstructionMessage, + message_op=MessageOp.SEND, + ) - @classmethod - def device_rpc(cls, rpc_id: str) -> str: + @staticmethod + def device_rpc(rpc_id: str) -> EndpointInfo: """ Endpoint for device rpc. This endpoint is used by the device server to publish the result of a device rpc using a messages.DeviceRPCMessage message. @@ -527,10 +601,13 @@ class MessageEndpoints: Returns: str: Endpoint for device rpc. """ - return f"{cls._device_rpc}/{rpc_id}" + endpoint = f"internal/devices/rpc/{rpc_id}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.DeviceRPCMessage, message_op=MessageOp.SET + ) - @classmethod - def pre_scan_macros(cls) -> str: + @staticmethod + def pre_scan_macros() -> str: """ Endpoint for pre scan macros. This endpoint is used to keep track of the pre scan macros. The endpoint is connected to a redis list. @@ -538,10 +615,10 @@ class MessageEndpoints: Returns: str: Endpoint for pre scan macros. """ - return cls._pre_scan_macros + return "internal/pre_scan_macros" - @classmethod - def post_scan_macros(cls) -> str: + @staticmethod + def post_scan_macros() -> str: """ Endpoint for post scan macros. This endpoint is used to keep track of the post scan macros. The endpoint is connected to a redis list. @@ -549,10 +626,10 @@ class MessageEndpoints: Returns: str: Endpoint for post scan macros. """ - return cls._post_scan_macros + return "internal/post_scan_macros" - @classmethod - def public_scan_info(cls, scanID: str) -> str: + @staticmethod + def public_scan_info(scanID: str) -> EndpointInfo: """ Endpoint for scan info. This endpoint is used by the scan worker to publish the scan info using a messages.ScanStatusMessage message. In contrast to the scan_info endpoint, @@ -565,10 +642,13 @@ class MessageEndpoints: str: Endpoint for scan info. """ - return cls._public_scan_info.substitute(scanID=scanID) + endpoint = f"public/{scanID}/scan_info" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ScanStatusMessage, message_op=MessageOp.SET + ) - @classmethod - def public_scan_segment(cls, scanID: str, pointID: int) -> str: + @staticmethod + def public_scan_segment(scanID: str, pointID: int) -> EndpointInfo: """ Endpoint for public scan segments. This endpoint is used by the scan bundler to publish the scan segment using a messages.ScanMessage message. In contrast to the @@ -583,10 +663,13 @@ class MessageEndpoints: str: Endpoint for scan segments. """ - return cls._public_scan_segment.substitute(scanID=scanID, pointID=pointID) + endpoint = f"public/{scanID}/scan_segment/{pointID}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ScanMessage, message_op=MessageOp.SET + ) - @classmethod - def public_scan_baseline(cls, scanID: str) -> str: + @staticmethod + def public_scan_baseline(scanID: str) -> EndpointInfo: """ Endpoint for public scan baseline readings. This endpoint is used by the scan bundler to publish the scan baseline readings using a messages.ScanBaselineMessage message. @@ -599,10 +682,13 @@ class MessageEndpoints: Returns: str: Endpoint for scan baseline readings. """ - return cls._public_scan_baseline.substitute(scanID=scanID) + endpoint = f"public/{scanID}/scan_baseline" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ScanBaselineMessage, message_op=MessageOp.SET + ) - @classmethod - def public_file(cls, scanID: str, name: str) -> str: + @staticmethod + def public_file(scanID: str, name: str) -> EndpointInfo: """ Endpoint for public file. This endpoint is used by the file writer to publish the status of the file writing using a messages.FileMessage message. @@ -614,10 +700,13 @@ class MessageEndpoints: Returns: str: Endpoint for public files. """ - return cls._public_file.substitute(scanID=scanID, name=name) + endpoint = f"public/{scanID}/file/{name}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.FileMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def file_event(cls, name: str) -> str: + @staticmethod + def file_event(name: str) -> EndpointInfo: """ Endpoint for public file_event. This endpoint is used by the file writer to publish the status of the file writing using a messages.FileMessage message. @@ -628,10 +717,13 @@ class MessageEndpoints: Returns: str: Endpoint for public file_events. """ - return f"{cls._file_event}/{name}" + endpoint = f"public/file_event/{name}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.FileMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def file_content(cls) -> str: + @staticmethod + def file_content() -> EndpointInfo: """ Endpoint for file content. This endpoint is used by the file writer to publish the file content using a messages.FileContentMessage message. @@ -639,11 +731,16 @@ class MessageEndpoints: Returns: str: Endpoint for file content. """ - return cls._file_content + endpoint = "internal/file_content" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.FileContentMessage, + message_op=MessageOp.SET_PUBLISH, + ) # log - @classmethod - def log(cls) -> str: + @staticmethod + def log() -> EndpointInfo: """ Endpoint for log. This endpoint is used by the redis connector to publish logs using a messages.LogMessage message. @@ -651,10 +748,13 @@ class MessageEndpoints: Returns: str: Endpoint for log. """ - return cls._log + endpoint = "internal/log" + return EndpointInfo( + endpoint=endpoint, message_type=messages.LogMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def alarm(cls) -> str: + @staticmethod + def alarm() -> EndpointInfo: """ Endpoint for alarms. This endpoint is used by the redis connector to publish alarms using a messages.AlarmMessage message. @@ -662,11 +762,14 @@ class MessageEndpoints: Returns: str: Endpoint for alarms. """ - return cls._alarms + endpoint = "internal/alarms" + return EndpointInfo( + endpoint=endpoint, message_type=messages.AlarmMessage, message_op=MessageOp.SET_PUBLISH + ) # service - @classmethod - def service_status(cls, service_id: str) -> str: + @staticmethod + def service_status(service_id: str) -> EndpointInfo: """ Endpoint for service status. This endpoint is used by all BEC services to publish their status using a messages.StatusMessage message. @@ -675,10 +778,13 @@ class MessageEndpoints: Args: service_id (str): Service ID, typically a uuid4 string. """ - return f"{cls._services_status}/{service_id}" + endpoint = f"internal/services/status/{service_id}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.StatusMessage, message_op=MessageOp.SET_PUBLISH + ) - @classmethod - def metrics(cls, service_id: str) -> str: + @staticmethod + def metrics(service_id: str) -> EndpointInfo: """ Endpoint for metrics. This endpoint is used by all BEC services to publish their performance metrics using a messages.ServiceMetricMessage message. @@ -689,10 +795,15 @@ class MessageEndpoints: Returns: str: Endpoint for metrics. """ - return f"{cls._metrics}/{service_id}" + endpoint = f"internal/services/metrics/{service_id}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ServiceMetricMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def service_response(cls, RID: str) -> str: + @staticmethod + def service_response(RID: str) -> EndpointInfo: """ Endpoint for service response. This endpoint is used by all BEC services to publish the result of a service request using a messages.ServiceResponseMessage message. @@ -703,11 +814,16 @@ class MessageEndpoints: Returns: str: Endpoint for service response. """ - return f"{cls._service_response}/{RID}" + endpoint = f"internal/services/response/{RID}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ServiceResponseMessage, + message_op=MessageOp.LIST, + ) # misc - @classmethod - def global_vars(cls, var_name: str) -> str: + @staticmethod + def global_vars(var_name: str) -> EndpointInfo: """ Endpoint for global variables. This endpoint is used to publish global variables using a messages.VariableMessage message. @@ -718,10 +834,13 @@ class MessageEndpoints: Returns: str: Endpoint for global variables. """ - return f"{cls._public_global_vars}/{var_name}" + endpoint = f"public/vars/{var_name}" + return EndpointInfo( + endpoint=endpoint, message_type=messages.VariableMessage, message_op=MessageOp.SET + ) - @classmethod - def observer(cls) -> str: + @staticmethod + def observer() -> EndpointInfo: """ Endpoint for observer. This endpoint is used to keep track of observer states using a. messages.ObserverMessage message. This endpoint is currently not used. @@ -729,10 +848,13 @@ class MessageEndpoints: Returns: str: Endpoint for observer. """ - return cls._observer + endpoint = "internal/observer" + return EndpointInfo( + endpoint=endpoint, message_type=messages.ObserverMessage, message_op=MessageOp.SET + ) - @classmethod - def progress(cls, var_name) -> str: + @staticmethod + def progress(var_name) -> EndpointInfo: """ Endpoint for progress. This endpoint is used to publish the current progress using a messages.ProgressMessage message. @@ -743,11 +865,16 @@ class MessageEndpoints: Returns: str: Endpoint for progress. """ - return f"{cls._progress}/{var_name}" + endpoint = f"public/progress/{var_name}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ProgressMessage, + message_op=MessageOp.SET_PUBLISH, + ) # logbook - @classmethod - def logbook(cls) -> str: + @staticmethod + def logbook() -> str: """ Endpoint for logbook. This endpoint is used to publish logbook info such as url, user and token using a direct msgpack dump of a dictionary. @@ -755,32 +882,35 @@ class MessageEndpoints: Returns: str: Endpoint for logbook. """ - return cls._logbook + return "internal/logbook" # scibec - @classmethod - def scibec(cls) -> str: + @staticmethod + def scibec() -> EndpointInfo: """ Endpoint for scibec. This endpoint is used to publish scibec info such as - url, user and token using a direct msgpack dump of a dictionary. + url, user and token using a CredentialsMessage. Returns: str: Endpoint for scibec. """ - return cls._scibec + endpoint = "internal/scibec" + return EndpointInfo( + endpoint=endpoint, message_type=messages.CredentialsMessage, message_op=MessageOp.SET + ) # experiment - @classmethod - def account(cls) -> str: + @staticmethod + def account() -> str: """ Endpoint for account. This endpoint is used to publish the current account. The value is set directly as a string. """ - return cls._account + return "internal/account" # data processing - @classmethod - def processed_data(cls, process_id: str) -> str: + @staticmethod + def processed_data(process_id: str) -> EndpointInfo: """ Endpoint for processed data. This endpoint is used to publish new processed data streams using a messages.ProcessedDataMessage message. @@ -791,10 +921,15 @@ class MessageEndpoints: Returns: str: Endpoint for processed data. """ - return f"{cls._processed_data}/{process_id}" + endpoint = f"public/processed_data/{process_id}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.ProcessedDataMessage, + message_op=MessageOp.STREAM, + ) - @classmethod - def dap_config(cls) -> str: + @staticmethod + def dap_config() -> EndpointInfo: """ Endpoint for DAP configuration. This endpoint is used to publish the DAP configuration using a messages.DAPConfigMessage message. @@ -802,10 +937,15 @@ class MessageEndpoints: Returns: str: Endpoint for DAP configuration. """ - return cls._dap_config + endpoint = "internal/dap/config" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.DAPConfigMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def dap_available_plugins(cls, plugin_id: str) -> str: + @staticmethod + def dap_available_plugins(plugin_id: str) -> EndpointInfo: """ Endpoint for available DAP plugins. This endpoint is used to publish the available DAP plugins using a messages.AvailableResourceMessage message. @@ -816,10 +956,15 @@ class MessageEndpoints: Returns: str: Endpoint for available DAP plugins. """ - return f"{cls._available_dap_plugins}/{plugin_id}" + endpoint = f"internal/dap/available_plugins/{plugin_id}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.AvailableResourceMessage, + message_op=MessageOp.SET, + ) - @classmethod - def dap_request(cls) -> str: + @staticmethod + def dap_request() -> EndpointInfo: """ Endpoint for DAP request. This endpoint is used to request a DAP using a messages.DAPRequestMessage message. @@ -827,10 +972,15 @@ class MessageEndpoints: Returns: str: Endpoint for DAP request. """ - return cls._dap_request + endpoint = "internal/dap/request" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.DAPRequestMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def dap_response(cls, RID: str) -> str: + @staticmethod + def dap_response(RID: str) -> EndpointInfo: """ Endpoint for DAP response. This endpoint is used to publish the DAP response using a messages.DAPResponseMessage message. @@ -841,11 +991,16 @@ class MessageEndpoints: Returns: str: Endpoint for DAP response. """ - return f"{cls._dap_response}/{RID}" + endpoint = f"internal/dap/response/{RID}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.DAPResponseMessage, + message_op=MessageOp.SET_PUBLISH, + ) # GUI - @classmethod - def gui_config(cls, gui_id: str) -> str: + @staticmethod + def gui_config(gui_id: str) -> EndpointInfo: """ Endpoint for GUI configuration. This endpoint is used to publish the GUI configuration using a messages.GUIConfigMessage message. @@ -853,10 +1008,15 @@ class MessageEndpoints: Returns: str: Endpoint for GUI configuration. """ - return f"{cls._gui_config}/{gui_id}" + endpoint = f"public/gui/config/{gui_id}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.GUIConfigMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def gui_data(cls, gui_id: str) -> str: + @staticmethod + def gui_data(gui_id: str) -> EndpointInfo: """ Endpoint for GUI data. This endpoint is used to publish the GUI data using a messages.GUIDataMessage message. @@ -864,10 +1024,15 @@ class MessageEndpoints: Returns: str: Endpoint for GUI data. """ - return f"{cls._gui_data}/{gui_id}" + endpoint = f"public/gui/data/{gui_id}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.GUIDataMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def gui_instructions(cls, gui_id: str) -> str: + @staticmethod + def gui_instructions(gui_id: str) -> EndpointInfo: """ Endpoint for GUI instructions. This endpoint is used to publish the GUI instructions using a messages.GUIInstructionMessage message. @@ -875,10 +1040,15 @@ class MessageEndpoints: Returns: str: Endpoint for GUI instructions. """ - return f"{cls._gui_instructions}/{gui_id}" + endpoint = f"public/gui/instruction/{gui_id}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.GUIInstructionMessage, + message_op=MessageOp.SET_PUBLISH, + ) - @classmethod - def gui_instruction_response(cls, RID: str) -> str: + @staticmethod + def gui_instruction_response(RID: str) -> EndpointInfo: """ Endpoint for GUI instruction response. This endpoint is used to publish the GUI instruction response using a messages.RequestResponseMessage message. @@ -886,4 +1056,9 @@ class MessageEndpoints: Returns: str: Endpoint for GUI instruction response. """ - return f"{cls._gui_instruction_response}/{RID}" + endpoint = f"public/gui/instruction_response/{RID}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.RequestResponseMessage, + message_op=MessageOp.SET_PUBLISH, + ) diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index abfa1113..c8660284 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -1,3 +1,7 @@ +""" +BECMessage classes for communication between BEC components. +""" + from __future__ import annotations import enum @@ -83,8 +87,7 @@ class BundleMessage(BECMessage): """append a new BECMessage to the bundle""" if not isinstance(msg, BECMessage): raise AttributeError(f"Cannot append message of type {msg.__class__.__name__}") - else: - self.messages.append(msg) + self.messages.append(msg) def __len__(self): return len(self.messages) diff --git a/bec_lib/bec_lib/redis_connector.py b/bec_lib/bec_lib/redis_connector.py index cccb5d49..f14aa5c2 100644 --- a/bec_lib/bec_lib/redis_connector.py +++ b/bec_lib/bec_lib/redis_connector.py @@ -14,9 +14,9 @@ import redis import redis.client import redis.exceptions -from bec_lib.logger import bec_logger from bec_lib.connector import ConnectorBase, MessageObject -from bec_lib.endpoints import MessageEndpoints +from bec_lib.endpoints import EndpointInfo, MessageEndpoints +from bec_lib.logger import bec_logger from bec_lib.messages import AlarmMessage, BECMessage, LogMessage from bec_lib.serialization import MsgpackSerialization @@ -24,6 +24,30 @@ if TYPE_CHECKING: from bec_lib.alarm_handler import Alarms +def _validate_endpoint(func, endpoint): + if not isinstance(endpoint, EndpointInfo): + return + if func.__name__ not in endpoint.message_op: + raise ValueError(f"Endpoint {endpoint} is not compatible with {func.__name__} method") + + +def check_topic(func): + @wraps(func) + def wrapper(self, topic, *args, **kwargs): + if isinstance(topic, str): + warnings.warn( + "RedisConnector methods with a string topic are deprecated and should not be used anymore. Use RedisConnector methods with an EndpointInfo instead.", + DeprecationWarning, + ) + return func(self, topic, *args, **kwargs) + if isinstance(topic, EndpointInfo): + _validate_endpoint(func, topic) + return func(self, topic.endpoint, *args, **kwargs) + return func(self, topic, *args, **kwargs) + + return wrapper + + class RedisConnector(ConnectorBase): def __init__(self, bootstrap: list, redis_cls=None): super().__init__(bootstrap) @@ -74,21 +98,10 @@ class RedisConnector(ConnectorBase): """send an error as log""" self.send(MessageEndpoints.log(), LogMessage(log_type="error", log_msg=msg)) - def raise_alarm( - self, - severity: Alarms, - alarm_type: str, - source: str, - msg: str, - metadata: dict, - ): + def raise_alarm(self, severity: Alarms, alarm_type: str, source: str, msg: str, metadata: dict): """raise an alarm""" alarm_msg = AlarmMessage( - severity=severity, - alarm_type=alarm_type, - source=source, - msg=msg, - metadata=metadata, + severity=severity, alarm_type=alarm_type, source=source, msg=msg, metadata=metadata ) self.set_and_publish(MessageEndpoints.alarm(), alarm_msg) @@ -112,6 +125,7 @@ class RedisConnector(ConnectorBase): client = pipe if pipe is not None else self._redis_conn client.publish(topic, msg) + @check_topic def send(self, topic: str, msg: BECMessage, pipe=None) -> None: """send to redis""" if not isinstance(msg, BECMessage): @@ -124,8 +138,7 @@ class RedisConnector(ConnectorBase): # under the hood, it uses asyncio - this lets the possibility to stop # the loop on demand self._events_listener_thread = threading.Thread( - target=self._get_messages_loop, - args=(self._pubsub_conn,), + target=self._get_messages_loop, args=(self._pubsub_conn,) ) self._events_listener_thread.start() # make a weakref from the callable, using louie; @@ -135,6 +148,9 @@ class RedisConnector(ConnectorBase): if patterns is not None: if isinstance(patterns, str): patterns = [patterns] + elif isinstance(patterns, EndpointInfo): + _validate_endpoint(self.register, patterns) + patterns = [patterns.endpoint] self._pubsub_conn.psubscribe(patterns) for pattern in patterns: @@ -142,6 +158,9 @@ class RedisConnector(ConnectorBase): else: if isinstance(topics, str): topics = [topics] + elif isinstance(topics, EndpointInfo): + _validate_endpoint(self.register, topics) + topics = [topics.endpoint] self._pubsub_conn.subscribe(topics) for topic in topics: @@ -181,10 +200,7 @@ class RedisConnector(ConnectorBase): callbacks = self._topics_cb[msg["pattern"].decode()] else: callbacks = self._topics_cb[channel] - msg = MessageObject( - topic=channel, - value=MsgpackSerialization.loads(msg["data"]), - ) + msg = MessageObject(topic=channel, value=MsgpackSerialization.loads(msg["data"])) for cb_ref, kwargs in callbacks: cb = cb_ref() if cb: @@ -214,6 +230,7 @@ class RedisConnector(ConnectorBase): while self.poll_messages(): ... + @check_topic def lpush( self, topic: str, msg: str, pipe=None, max_size: int = None, expire: int = None ) -> None: @@ -234,12 +251,14 @@ class RedisConnector(ConnectorBase): if not pipe: client.execute() + @check_topic def lset(self, topic: str, index: int, msg: str, pipe=None) -> None: client = pipe if pipe is not None else self._redis_conn if isinstance(msg, BECMessage): msg = MsgpackSerialization.dumps(msg) return client.lset(topic, index, msg) + @check_topic def rpush(self, topic: str, msg: str, pipe=None) -> int: """O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments. Insert all the specified @@ -251,6 +270,7 @@ class RedisConnector(ConnectorBase): msg = MsgpackSerialization.dumps(msg) return client.rpush(topic, msg) + @check_topic def lrange(self, topic: str, start: int, end: int, pipe=None): """O(S+N) where S is the distance of start offset from HEAD for small lists, from nearest end (HEAD or TAIL) for large lists; and N is the @@ -262,16 +282,17 @@ class RedisConnector(ConnectorBase): cmd_result = client.lrange(topic, start, end) if pipe: return cmd_result - else: - # in case of command executed in a pipe, use 'execute_pipeline' method - ret = [] - for msg in cmd_result: - try: - ret.append(MsgpackSerialization.loads(msg)) - except RuntimeError: - ret.append(msg) - return ret + # in case of command executed in a pipe, use 'execute_pipeline' method + ret = [] + for msg in cmd_result: + try: + ret.append(MsgpackSerialization.loads(msg)) + except RuntimeError: + ret.append(msg) + return ret + + @check_topic def set_and_publish(self, topic: str, msg, pipe=None, expire: int = None) -> None: """piped combination of self.publish and self.set""" client = pipe if pipe is not None else self.pipeline() @@ -283,6 +304,7 @@ class RedisConnector(ConnectorBase): if not pipe: client.execute() + @check_topic def set(self, topic: str, msg, pipe=None, expire: int = None) -> None: """set redis value""" client = pipe if pipe is not None else self._redis_conn @@ -292,13 +314,18 @@ class RedisConnector(ConnectorBase): def keys(self, pattern: str) -> list: """returns all keys matching a pattern""" + if isinstance(pattern, EndpointInfo): + _validate_endpoint(self.keys, pattern) + pattern = pattern.endpoint return self._redis_conn.keys(pattern) + @check_topic def delete(self, topic, pipe=None): """delete topic""" client = pipe if pipe is not None else self._redis_conn client.delete(topic) + @check_topic def get(self, topic: str, pipe=None): """retrieve entry, either via hgetall or get""" client = pipe if pipe is not None else self._redis_conn @@ -311,6 +338,7 @@ class RedisConnector(ConnectorBase): except RuntimeError: return data + @check_topic def xadd(self, topic: str, msg_dict: dict, max_size=None, pipe=None, expire: int = None): """ add to stream @@ -345,6 +373,7 @@ class RedisConnector(ConnectorBase): if not pipe and expire: client.execute() + @check_topic def get_last(self, topic: str, key="data"): """retrieve last entry from stream""" client = self._redis_conn @@ -359,13 +388,9 @@ class RedisConnector(ConnectorBase): return msg_dict return msg_dict.get(key) + @check_topic def xread( - self, - topic: str, - id: str = None, - count: int = None, - block: int = None, - from_start=False, + self, topic: str, id: str = None, count: int = None, block: int = None, from_start=False ) -> list: """ read from stream @@ -422,6 +447,7 @@ class RedisConnector(ConnectorBase): self.stream_keys[topic] = index return out if out else None + @check_topic def xrange(self, topic: str, min: str, max: str, count: int = None): """ read a range from stream @@ -464,9 +490,9 @@ class RedisConnector(ConnectorBase): In order to keep this fail-safe and simple it uses 'mock'... """ - from unittest.mock import ( + from unittest.mock import ( # import is done here, to not pollute the file with something normally in tests Mock, - ) # import is done here, to not pollute the file with something normally in tests + ) warnings.warn( "RedisConnector.consumer() is deprecated and should not be used anymore. Use RedisConnector.register() with 'topics', 'patterns', 'cb' or 'start_thread' instead. Additional keyword args are transmitted to the callback. For the caller, the main difference with RedisConnector.register() is that it does not return a new thread.", diff --git a/bec_lib/bec_lib/scan_report.py b/bec_lib/bec_lib/scan_report.py index 0f1a2b25..b45c4e7d 100644 --- a/bec_lib/bec_lib/scan_report.py +++ b/bec_lib/bec_lib/scan_report.py @@ -90,7 +90,7 @@ class ScanReport: """get the status of a move request""" motors = list(self.request.request.content["parameter"]["args"].keys()) request_status = self._client.device_manager.connector.lrange( - MessageEndpoints.device_req_status(self.request.requestID), 0, -1 + MessageEndpoints.device_req_status_container(self.request.requestID), 0, -1 ) if len(request_status) == len(motors): return True diff --git a/bec_lib/bec_lib/tests/utils.py b/bec_lib/bec_lib/tests/utils.py index 69bfc819..3a6b2a9e 100644 --- a/bec_lib/bec_lib/tests/utils.py +++ b/bec_lib/bec_lib/tests/utils.py @@ -13,7 +13,7 @@ import yaml from bec_lib import BECClient, messages from bec_lib.connector import ConnectorBase from bec_lib.devicemanager import DeviceManagerBase -from bec_lib.endpoints import MessageEndpoints +from bec_lib.endpoints import EndpointInfo, MessageEndpoints from bec_lib.logger import bec_logger from bec_lib.scans import Scans from bec_lib.service_config import ServiceConfig @@ -542,24 +542,18 @@ class ConnectorMock(ConnectorBase): # pragma: no cover def register(self, *args, **kwargs): pass - def set(self, *args, **kwargs): - pass - - def set_and_publish(self, *args, **kwargs): - pass - def keys(self, *args, **kwargs): return [] def set(self, topic, msg, pipe=None, expire: int = None): if pipe: - pipe._pipe_buffer.append(("set", (topic, msg), {"expire": expire})) + pipe._pipe_buffer.append(("set", (topic.endpoint, msg), {"expire": expire})) return self.message_sent.append({"queue": topic, "msg": msg, "expire": expire}) def raw_send(self, topic, msg, pipe=None): if pipe: - pipe._pipe_buffer.append(("send", (topic, msg), {})) + pipe._pipe_buffer.append(("send", (topic.endpoint, msg), {})) return self.message_sent.append({"queue": topic, "msg": msg}) @@ -570,7 +564,7 @@ class ConnectorMock(ConnectorBase): # pragma: no cover def set_and_publish(self, topic, msg, pipe=None, expire: int = None): if pipe: - pipe._pipe_buffer.append(("set_and_publish", (topic, msg), {"expire": expire})) + pipe._pipe_buffer.append(("set_and_publish", (topic.endpoint, msg), {"expire": expire})) return self.message_sent.append({"queue": topic, "msg": msg, "expire": expire}) @@ -592,6 +586,8 @@ class ConnectorMock(ConnectorBase): # pragma: no cover return [] def get(self, topic, pipe=None): + if isinstance(topic, EndpointInfo): + topic = topic.endpoint if pipe: pipe._pipe_buffer.append(("get", (topic,), {})) return diff --git a/bec_lib/tests/test_async_data.py b/bec_lib/tests/test_async_data.py index 316e6719..530269f9 100644 --- a/bec_lib/tests/test_async_data.py +++ b/bec_lib/tests/test_async_data.py @@ -89,8 +89,8 @@ def test_get_async_data_for_scan(): producer = mock.MagicMock() async_data = AsyncDataHandler(producer) producer.keys.return_value = [ - MessageEndpoints.device_async_readback("scanID", "samx").encode(), - MessageEndpoints.device_async_readback("scanID", "samy").encode(), + MessageEndpoints.device_async_readback("scanID", "samx").endpoint.encode(), + MessageEndpoints.device_async_readback("scanID", "samy").endpoint.encode(), ] with mock.patch.object(async_data, "get_async_data_for_device") as mock_get: async_data.get_async_data_for_scan("scanID") diff --git a/bec_lib/tests/test_bec_service.py b/bec_lib/tests/test_bec_service.py index 2cd0e9a8..b9d8711a 100644 --- a/bec_lib/tests/test_bec_service.py +++ b/bec_lib/tests/test_bec_service.py @@ -116,8 +116,8 @@ def test_bec_service_service_status(): def test_bec_service_update_existing_services(): service_keys = [ - MessageEndpoints.service_status("service1").encode(), - MessageEndpoints.service_status("service2").encode(), + MessageEndpoints.service_status("service1").endpoint.encode(), + MessageEndpoints.service_status("service2").endpoint.encode(), ] service_msgs = [ messages.StatusMessage(name="service1", status=BECStatus.RUNNING, info={}, metadata={}), @@ -136,8 +136,8 @@ def test_bec_service_update_existing_services(): def test_bec_service_update_existing_services_ignores_wrong_msgs(): service_keys = [ - MessageEndpoints.service_status("service1").encode(), - MessageEndpoints.service_status("service2").encode(), + MessageEndpoints.service_status("service1").endpoint.encode(), + MessageEndpoints.service_status("service2").endpoint.encode(), ] service_msgs = [ messages.StatusMessage(name="service1", status=BECStatus.RUNNING, info={}, metadata={}), diff --git a/bec_lib/tests/test_scan_items.py b/bec_lib/tests/test_scan_items.py index 0ec441d5..cf1ebb31 100644 --- a/bec_lib/tests/test_scan_items.py +++ b/bec_lib/tests/test_scan_items.py @@ -63,7 +63,7 @@ from bec_lib.tests.utils import ConnectorMock ) def test_update_with_queue_status(queue_msg): scan_manager = ScanManager(ConnectorMock("")) - scan_manager.connector._get_buffer[MessageEndpoints.scan_queue_status()] = queue_msg + scan_manager.connector._get_buffer[MessageEndpoints.scan_queue_status().endpoint] = queue_msg scan_manager.update_with_queue_status(queue_msg) assert ( scan_manager.scan_storage.find_scan_by_ID("bfa582aa-f9cd-4258-ab5d-3e5d54d3dde5") diff --git a/device_server/device_server/device_server.py b/device_server/device_server/device_server.py index e3ceda81..43226647 100644 --- a/device_server/device_server/device_server.py +++ b/device_server/device_server/device_server.py @@ -5,13 +5,13 @@ import traceback from concurrent.futures import ThreadPoolExecutor import ophyd +from ophyd import Kind, OphydObject, Staged +from ophyd.utils import errors as ophyd_errors + from bec_lib import Alarms, BECService, MessageEndpoints, bec_logger, messages from bec_lib.connector import ConnectorBase from bec_lib.device import OnFailure from bec_lib.messages import BECStatus -from ophyd import Kind, OphydObject, Staged -from ophyd.utils import errors as ophyd_errors - from device_server.devices import rgetattr from device_server.devices.devicemanager import DeviceManagerDS from device_server.rpc_mixin import RPCMixin @@ -39,8 +39,7 @@ class DeviceServer(RPCMixin, BECService): self._tasks = [] self.device_manager = None self.connector.register( - MessageEndpoints.scan_queue_modification(), - cb=self.register_interception_callback, + MessageEndpoints.scan_queue_modification(), cb=self.register_interception_callback ) self.executor = ThreadPoolExecutor(max_workers=4) self._start_device_manager() @@ -296,7 +295,7 @@ class DeviceServer(RPCMixin, BECService): response = status.instruction.metadata.get("response") if response: self.connector.lpush( - MessageEndpoints.device_req_status(status.instruction.metadata["RID"]), + MessageEndpoints.device_req_status_container(status.instruction.metadata["RID"]), dev_msg, pipe, expire=18000, diff --git a/device_server/device_server/devices/devicemanager.py b/device_server/device_server/devices/devicemanager.py index 8bf3843d..bf66d2ad 100644 --- a/device_server/device_server/devices/devicemanager.py +++ b/device_server/device_server/devices/devicemanager.py @@ -10,6 +10,10 @@ import numpy as np import ophyd import ophyd.sim as ops import ophyd_devices as opd +from ophyd.ophydobj import OphydObject +from ophyd.signal import EpicsSignalBase +from typeguard import typechecked + from bec_lib import ( BECService, DeviceBase, @@ -22,9 +26,6 @@ from bec_lib import ( from bec_lib.connector import ConnectorBase from device_server.devices.config_update_handler import ConfigUpdateHandler from device_server.devices.device_serializer import get_device_info -from ophyd.ophydobj import OphydObject -from ophyd.signal import EpicsSignalBase -from typeguard import typechecked try: from bec_plugins import devices as plugin_devices @@ -64,7 +65,9 @@ class DSDevice(DeviceBase): limits = None pipe = connector.pipeline() connector.set_and_publish(MessageEndpoints.device_readback(self.name), dev_msg, pipe=pipe) - connector.set(topic=MessageEndpoints.device_read(self.name), msg=dev_msg, pipe=pipe) + connector.set_and_publish( + topic=MessageEndpoints.device_read(self.name), msg=dev_msg, pipe=pipe + ) connector.set_and_publish( MessageEndpoints.device_read_configuration(self.name), dev_config_msg, pipe=pipe ) @@ -476,7 +479,7 @@ class DeviceManagerDS(DeviceManagerBase): device = kwargs["obj"].root.name status = 0 metadata = self.devices[device].metadata - self.connector.send( + self.connector.set( MessageEndpoints.device_status(device), messages.DeviceStatusMessage(device=device, status=status, metadata=metadata), ) diff --git a/device_server/tests/test_device_manager_ds.py b/device_server/tests/test_device_manager_ds.py index 607e9c1b..f72f2fc0 100644 --- a/device_server/tests/test_device_manager_ds.py +++ b/device_server/tests/test_device_manager_ds.py @@ -141,8 +141,8 @@ def test_flyer_event_callback(): assert progress[0] == "set_and_publish" # check endpoint - assert bundle[1][0] == MessageEndpoints.device_read("samx") - assert progress[1][0] == MessageEndpoints.device_progress("samx") + assert bundle[1][0] == MessageEndpoints.device_read("samx").endpoint + assert progress[1][0] == MessageEndpoints.device_progress("samx").endpoint # check message bundle_msg = bundle[1][1] diff --git a/device_server/tests/test_device_server.py b/device_server/tests/test_device_server.py index 0f69f8c8..fa37256d 100644 --- a/device_server/tests/test_device_server.py +++ b/device_server/tests/test_device_server.py @@ -621,6 +621,7 @@ def test_kickoff_device(device_server_mock, instr): kickoff.assert_called_once() +@pytest.mark.timeout(5) @pytest.mark.parametrize( "instr", [ @@ -639,7 +640,7 @@ def test_set_device(device_server_mock, instr): res = [ msg for msg in device_server.connector.message_sent - if msg["queue"] == MessageEndpoints.device_req_status("samx") + if msg["queue"] == MessageEndpoints.device_req_status("samx").endpoint ] if res: break @@ -675,7 +676,7 @@ def test_read_device(device_server_mock, instr): res = [ msg for msg in device_server.connector.message_sent - if msg["queue"] == MessageEndpoints.device_read(device) + if msg["queue"] == MessageEndpoints.device_read(device).endpoint ] assert res[-1]["msg"].metadata["RID"] == instr.metadata["RID"] assert res[-1]["msg"].metadata["stream"] == "primary" @@ -689,12 +690,12 @@ def test_read_config_and_update_devices(device_server_mock, devices): res = [ msg for msg in device_server.connector.message_sent - if msg["queue"] == MessageEndpoints.device_read_configuration(device) + if msg["queue"] == MessageEndpoints.device_read_configuration(device).endpoint ] config = device_server.device_manager.devices[device].obj.read_configuration() msg = res[-1]["msg"] assert msg.content["signals"].keys() == config.keys() - assert res[-1]["queue"] == MessageEndpoints.device_read_configuration(device) + assert res[-1]["queue"] == MessageEndpoints.device_read_configuration(device).endpoint def test_read_and_update_devices_exception(device_server_mock): diff --git a/file_writer/file_writer/file_writer_manager.py b/file_writer/file_writer/file_writer_manager.py index 123beabf..07f70954 100644 --- a/file_writer/file_writer/file_writer_manager.py +++ b/file_writer/file_writer/file_writer_manager.py @@ -16,6 +16,7 @@ from bec_lib.alarm_handler import Alarms from bec_lib.async_data import AsyncDataHandler from bec_lib.file_utils import FileWriterMixin from bec_lib.redis_connector import MessageObject, RedisConnector + from file_writer.file_writer import NexusFileWriter logger = bec_logger.logger @@ -229,9 +230,9 @@ class FileWriterManager(BECService): return for device_key in async_device_keys: key = device_key.decode() - device_name = key.split(MessageEndpoints.device_async_readback(scanID, ""))[-1].split( - ":" - )[0] + device_name = key.split(MessageEndpoints.device_async_readback(scanID, "").endpoint)[ + -1 + ].split(":")[0] msgs = self.connector.xrange(key, min="-", max="+") if not msgs: continue diff --git a/file_writer/tests/test_file_writer_manager.py b/file_writer/tests/test_file_writer_manager.py index 9da6b8b6..12852121 100644 --- a/file_writer/tests/test_file_writer_manager.py +++ b/file_writer/tests/test_file_writer_manager.py @@ -196,7 +196,7 @@ def test_update_async_data(): file_manager.scan_storage["scanID"] = ScanStorage(10, "scanID") with mock.patch.object(file_manager, "connector") as mock_connector: with mock.patch.object(file_manager, "_process_async_data") as mock_process: - key = MessageEndpoints.device_async_readback("scanID", "dev1") + key = MessageEndpoints.device_async_readback("scanID", "dev1").endpoint mock_connector.keys.return_value = [key.encode()] data = [(b"0-0", b'{"data": "data"}')] mock_connector.xrange.return_value = data diff --git a/scan_bundler/scan_bundler/emitter.py b/scan_bundler/scan_bundler/emitter.py index b6ffdfca..c38f0409 100644 --- a/scan_bundler/scan_bundler/emitter.py +++ b/scan_bundler/scan_bundler/emitter.py @@ -44,12 +44,7 @@ class EmitterBase: msg_dump = msg msgs.append(msg_dump) if public: - self.connector.set( - public, - msg_dump, - pipe=pipe, - expire=1800, - ) + self.connector.set(public, msg_dump, pipe=pipe, expire=1800) self.connector.send(endpoint, msgs, pipe=pipe) pipe.execute() diff --git a/scan_bundler/scan_bundler/scan_bundler.py b/scan_bundler/scan_bundler/scan_bundler.py index a15402c7..09c0f6a8 100644 --- a/scan_bundler/scan_bundler/scan_bundler.py +++ b/scan_bundler/scan_bundler/scan_bundler.py @@ -72,7 +72,7 @@ class ScanBundler(BECService): def _device_read_callback(self, msg, **_kwargs): # pylint: disable=protected-access - dev = msg.topic.split(MessageEndpoints._device_read + "/")[-1] + dev = msg.topic.split(MessageEndpoints.device_read("").endpoint)[-1] msgs = msg.value logger.debug(f"Received reading from device {dev}") if not isinstance(msgs, list): @@ -251,16 +251,12 @@ class ScanBundler(BECService): ) } - def _get_scan_status_history(self, length): - return self.connector.lrange(MessageEndpoints.scan_status() + "_list", length * -1, -1) - def _wait_for_scanID(self, scanID, timeout_time=10): elapsed_time = 0 while not scanID in self.storage_initialized: - msgs = self._get_scan_status_history(5) - for msg in msgs: - if msg.content["scanID"] == scanID: - self.handle_scan_status_message(msg) + msg = self.connector.get(MessageEndpoints.public_scan_info(scanID)) + if msg and msg.content["scanID"] == scanID: + self.handle_scan_status_message(msg) if scanID in self.sync_storage: if self.sync_storage[scanID]["status"] in ["closed", "aborted"]: logger.info( diff --git a/scan_bundler/tests/test_scan_bundler.py b/scan_bundler/tests/test_scan_bundler.py index 4e2575a2..a5c549d8 100644 --- a/scan_bundler/tests/test_scan_bundler.py +++ b/scan_bundler/tests/test_scan_bundler.py @@ -71,7 +71,7 @@ def test_device_read_callback(): metadata={"scanID": "laksjd", "readout_priority": "monitored"}, ) msg.value = dev_msg - msg.topic = MessageEndpoints.device_read("samx") + msg.topic = MessageEndpoints.device_read("samx").endpoint with mock.patch.object(scan_bundler, "_add_device_to_storage") as add_dev: scan_bundler._device_read_callback(msg) @@ -81,55 +81,43 @@ def test_device_read_callback(): @pytest.mark.parametrize( "scanID,storageID,scan_msg", [ - ("adlk-jalskdj", None, []), + ("adlk-jalskdj", None, None), ( "adlk-jalskdjs", "adlk-jalskdjs", - [ - messages.ScanStatusMessage( - scanID="adlk-jalskdjs", - status="open", - info={ - "scan_motors": ["samx"], - "readout_priority": { - "monitored": ["samx"], - "baseline": [], - "on_request": [], - }, - "queueID": "my-queue-ID", - "scan_number": 5, - "scan_type": "step", - }, - ) - ], + messages.ScanStatusMessage( + scanID="adlk-jalskdjs", + status="open", + info={ + "scan_motors": ["samx"], + "readout_priority": {"monitored": ["samx"], "baseline": [], "on_request": []}, + "queueID": "my-queue-ID", + "scan_number": 5, + "scan_type": "step", + }, + ), ), ( "adlk-jalskdjs", "", - [ - messages.ScanStatusMessage( - scanID="adlk-jalskdjs", - status="open", - info={ - "scan_motors": ["samx"], - "readout_priority": { - "monitored": ["samx"], - "baseline": [], - "on_request": [], - }, - "queueID": "my-queue-ID", - "scan_number": 5, - "scan_type": "step", - }, - ) - ], + messages.ScanStatusMessage( + scanID="adlk-jalskdjs", + status="open", + info={ + "scan_motors": ["samx"], + "readout_priority": {"monitored": ["samx"], "baseline": [], "on_request": []}, + "queueID": "my-queue-ID", + "scan_number": 5, + "scan_type": "step", + }, + ), ), ], ) def test_wait_for_scanID(scanID, storageID, scan_msg): sb = load_ScanBundlerMock() sb.storage_initialized.add(storageID) - with mock.patch.object(sb, "_get_scan_status_history", return_value=scan_msg) as get_scan_msgs: + with mock.patch.object(sb.connector, "get", return_value=scan_msg) as get_scan_msgs: if not storageID and not scan_msg: with pytest.raises(TimeoutError): sb._wait_for_scanID(scanID, 1) @@ -137,32 +125,6 @@ def test_wait_for_scanID(scanID, storageID, scan_msg): sb._wait_for_scanID(scanID) -@pytest.mark.parametrize( - "msgs", - [ - [ - messages.ScanStatusMessage( - scanID="scanID", - status="open", - info={ - "primary": ["samx"], - "queueID": "my-queue-ID", - "scan_number": 5, - "scan_type": "step", - }, - ) - ], - [], - ], -) -def test_get_scan_status_history(msgs): - sb = load_ScanBundlerMock() - with mock.patch.object(sb.connector, "lrange", return_value=[msg for msg in msgs]) as lrange: - res = sb._get_scan_status_history(5) - lrange.assert_called_once_with(MessageEndpoints.scan_status() + "_list", -5, -1) - assert res == msgs - - def test_add_device_to_storage_returns_without_scanID(): msg = messages.DeviceMessage( signals={"samx": {"samx": 0.51, "setpoint": 0.5, "motor_is_moving": 0}}, diff --git a/scan_server/scan_server/scans.py b/scan_server/scan_server/scans.py index 77c0fde4..1c1d0e2f 100644 --- a/scan_server/scan_server/scans.py +++ b/scan_server/scan_server/scans.py @@ -5,6 +5,7 @@ from abc import ABC, abstractmethod from typing import Any, Literal import numpy as np + from bec_lib import DeviceManagerBase, MessageEndpoints, bec_logger, messages from .errors import LimitError, ScanAbortion @@ -561,7 +562,9 @@ class SyncFlyScanBase(ScanBase, ABC): connector = self.device_manager.connector pipe = connector.pipeline() - connector.lrange(MessageEndpoints.device_req_status(self.metadata["RID"]), 0, -1, pipe) + connector.lrange( + MessageEndpoints.device_req_status_container(self.metadata["RID"]), 0, -1, pipe + ) connector.get(MessageEndpoints.device_readback(flyer), pipe) return connector.execute_pipeline(pipe) @@ -1321,7 +1324,9 @@ class MonitorScan(ScanBase): connector = self.device_manager.connector pipe = connector.pipeline() - connector.lrange(MessageEndpoints.device_req_status(self.metadata["RID"]), 0, -1, pipe) + connector.lrange( + MessageEndpoints.device_req_status_container(self.metadata["RID"]), 0, -1, pipe + ) connector.get(MessageEndpoints.device_readback(self.flyer), pipe) return connector.execute_pipeline(pipe) diff --git a/scan_server/tests/test_scan_worker.py b/scan_server/tests/test_scan_worker.py index ef53af3d..dd76e83e 100644 --- a/scan_server/tests/test_scan_worker.py +++ b/scan_server/tests/test_scan_worker.py @@ -461,7 +461,7 @@ def test_check_for_failed_movements(scan_worker_mock, device_status, devices, in if abort: with pytest.raises(ScanAbortion): worker.device_manager.connector._get_buffer[ - MessageEndpoints.device_readback("samx") + MessageEndpoints.device_readback("samx").endpoint ] = messages.DeviceMessage(signals={"samx": {"value": 4}}, metadata={}) worker._check_for_failed_movements(device_status, devices, instr) else: @@ -582,9 +582,9 @@ def test_wait_for_idle(scan_worker_mock, msg1, msg2, req_msg: messages.DeviceReq with mock.patch.object( worker.validate, "get_device_status", return_value=[req_msg] ) as device_status: - worker.device_manager.connector._get_buffer[MessageEndpoints.device_readback("samx")] = ( - messages.DeviceMessage(signals={"samx": {"value": 4}}, metadata={}) - ) + worker.device_manager.connector._get_buffer[ + MessageEndpoints.device_readback("samx").endpoint + ] = messages.DeviceMessage(signals={"samx": {"value": 4}}, metadata={}) worker._add_wait_group(msg1) if req_msg.content["success"]: @@ -644,7 +644,7 @@ def test_wait_for_read(scan_worker_mock, msg1, msg2, req_msg: messages.DeviceReq assert worker._groups == {} worker._groups["scan_motor"] = {"samx": 3, "samy": 4} worker.device_manager.connector._get_buffer[ - MessageEndpoints.device_readback("samx") + MessageEndpoints.device_readback("samx").endpoint ] = messages.DeviceMessage(signals={"samx": {"value": 4}}, metadata={}) worker._add_wait_group(msg1) worker._wait_for_read(msg2) @@ -1289,7 +1289,7 @@ def test_send_scan_status(scan_worker_mock, status, expire): scan_info_msgs = [ msg for msg in worker.device_manager.connector.message_sent - if msg["queue"] == MessageEndpoints.public_scan_info(scanID=worker.current_scanID) + if msg["queue"] == MessageEndpoints.public_scan_info(scanID=worker.current_scanID).endpoint ] assert len(scan_info_msgs) == 1 assert scan_info_msgs[0]["expire"] == expire