bec/bec_lib/bec_lib/request_items.py

188 lines
6.8 KiB
Python

"""
This module contains the RequestItem class and RequestStorage class. The RequestItem class is used to store information
about a scan request. The RequestStorage class is used to store request items.
"""
from __future__ import annotations
import threading
from collections import deque
from typing import TYPE_CHECKING
from bec_lib import messages
from bec_lib.callback_handler import CallbackHandler
from bec_lib.logger import bec_logger
from bec_lib.utils import threadlocked
logger = bec_logger.logger
if TYPE_CHECKING:
from bec_lib.queue_items import QueueItem
from bec_lib.scan_items import ScanItem
from bec_lib.scan_manager import ScanManager
class RequestItem:
# pylint: disable=too-many-arguments
def __init__(
self,
scan_manager: ScanManager,
requestID: str,
decision_pending: bool = True,
scan_id: str = None,
request=None,
response=None,
client_messages: list = [],
client_messages_asap: list = [],
accepted: bool = None,
**_kwargs,
) -> None:
self.scan_manager = scan_manager
self.requestID = requestID
self.request = request
self.response = response
self.accepted = accepted
self._decision_pending = decision_pending
self._scan_id = scan_id
# TODO #286
self.client_messages = client_messages
self.client_messages_asap = client_messages_asap
self.callbacks = CallbackHandler()
def update_with_response(self, response: messages.RequestResponseMessage):
"""update the current request item with a RequestResponseMessage / response message"""
self.response = response
self._decision_pending = False
self.requestID = response.metadata["RID"]
self.accepted = [response.content["accepted"]]
def update_with_request(self, request: messages.ScanQueueMessage):
"""update the current request item with a ScanQueueMessage / request message"""
self.request = request
self.requestID = request.metadata["RID"]
def update_with_client_message(self, message: messages.ClientInfoMessage):
"""Update the current request item with a ClientInfoMessage"""
self.client_messages.append(message)
# TODO #286
# if message.show_asap:
self.client_messages_asap.append(message)
@property
def decision_pending(self) -> bool:
"""indicates whether a decision has been made to accept or decline a scan request"""
if not self._decision_pending:
return self._decision_pending
if self.scan:
self._decision_pending = False
self.accepted = [True]
return self._decision_pending
@decision_pending.setter
def decision_pending(self, val: bool) -> None:
self._decision_pending = val
@classmethod
def from_response(cls, scan_manager: ScanManager, response: messages.RequestResponseMessage):
"""initialize a request item from a RequestReponseMessage / response message"""
scan_req = cls(
scan_manager=scan_manager,
requestID=response.metadata["RID"],
response=response,
decision_pending=False,
accepted=[response.content["accepted"]],
)
return scan_req
@classmethod
def from_request(cls, scan_manager: ScanManager, request: messages.ScanQueueMessage):
"""initialize a request item from a ScanQueueMessage / request message"""
scan_req = cls(
scan_manager=scan_manager, requestID=request.metadata["RID"], request=request
)
return scan_req
@classmethod
def from_client_message(cls, scan_manager: ScanManager, message: messages.ClientInfoMessage):
"""initialize a request item from a ClientInfoMessage"""
scan_req = cls(scan_manager=scan_manager, requestID=message.RID, client_messages=[message])
return scan_req
@property
def scan(self) -> ScanItem | None:
"""get the scan item for the given request item"""
queue_item = self.scan_manager.queue_storage.find_queue_item_by_requestID(self.requestID)
if not queue_item:
return None
# pylint: disable=protected-access
request_index = queue_item.requestIDs.index(self.requestID)
return queue_item.scans[request_index]
@property
def queue(self) -> QueueItem:
"""get the queue item for the given request_item"""
return self.scan_manager.queue_storage.find_queue_item_by_requestID(self.requestID)
class RequestStorage:
"""stores request items"""
def __init__(self, scan_manager: ScanManager, maxlen=50) -> None:
self.storage: deque[RequestItem] = deque(maxlen=maxlen)
self._lock = threading.RLock()
self.scan_manager = scan_manager
@threadlocked
def find_request_by_ID(self, requestID: str) -> RequestItem | None:
"""find a request item based on its requestID"""
for request in self.storage:
if request.requestID == requestID:
return request
return None
@threadlocked
def update_with_response(self, response_msg: messages.RequestResponseMessage) -> None:
"""create or update request item based on a new RequestResponseMessage"""
request_item = self.find_request_by_ID(response_msg.metadata.get("RID"))
if request_item:
request_item.update_with_response(response_msg)
logger.debug("Scan queue request exists. Updating with response.")
return
# it could be that the response arrived before the request
self.storage.append(RequestItem.from_response(self.scan_manager, response_msg))
logger.debug("Scan queue request does not exist. Creating from response.")
@threadlocked
def update_with_request(self, request_msg: messages.ScanQueueMessage) -> None:
"""create or update request item based on a new ScanQueueMessage (i.e. request message)"""
if not request_msg.metadata:
return
if not request_msg.metadata.get("RID"):
return
request_item = self.find_request_by_ID(request_msg.metadata.get("RID"))
if request_item:
request_item.update_with_request(request_msg)
return
self.storage.append(RequestItem.from_request(self.scan_manager, request_msg))
return
@threadlocked
def update_with_client_message(self, client_message: messages.ClientInfoMessage) -> None:
"""Update the request item with a new ClientInfoMessage"""
if not client_message.RID:
return
request_item = self.find_request_by_ID(client_message.RID)
if request_item:
request_item.update_with_client_message(client_message)
return
self.storage.append(RequestItem.from_client_message(self.scan_manager, client_message))
return