diff --git a/bec_client/bec_client/callbacks/ipython_live_updates.py b/bec_client/bec_client/callbacks/ipython_live_updates.py index ed2bb4ce..04618cb2 100644 --- a/bec_client/bec_client/callbacks/ipython_live_updates.py +++ b/bec_client/bec_client/callbacks/ipython_live_updates.py @@ -5,19 +5,21 @@ import collections import time from typing import TYPE_CHECKING -from bec_client.callbacks.scan_progress import LiveUpdatesScanProgress from bec_lib import bec_logger from bec_lib.bec_errors import ScanInterruption +from bec_client.callbacks.scan_progress import LiveUpdatesScanProgress + from .live_table import LiveUpdatesTable from .move_device import LiveUpdatesReadbackProgressbar from .utils import ScanRequestMixin, check_alarms if TYPE_CHECKING: - from bec_client import BECClient from bec_lib import messages from bec_lib.queue_items import QueueItem + from bec_client import BECClient + logger = bec_logger.logger @@ -155,6 +157,7 @@ class IPythonLiveUpdates: except ScanInterruption as scan_interr: self._interrupted_request = (request, scan_report_type) + self._reset(forced=True) raise scan_interr def _process_queue( @@ -179,7 +182,8 @@ class IPythonLiveUpdates: "status" ) print( - f"Scan is enqueued and is waiting for execution. Current position in queue: {queue.queue_position + 1}. Queue status: {status}.", + "Scan is enqueued and is waiting for execution. Current position in queue:" + f" {queue.queue_position + 1}. Queue status: {status}.", end="\r", flush=True, ) @@ -202,12 +206,12 @@ class IPythonLiveUpdates: return False - def _reset(self): + def _reset(self, forced=False): self._interrupted_request = None self._user_callback = None self._processed_instructions = 0 - scan_closed = self._active_request.content["scan_type"] == "close_scan_def" + scan_closed = forced or (self._active_request.content["scan_type"] == "close_scan_def") self._active_request = None if self.client.scans._scan_def_id and not scan_closed: diff --git a/bec_lib/bec_lib/scans.py b/bec_lib/bec_lib/scans.py index 1ff77d8e..acc5c83d 100644 --- a/bec_lib/bec_lib/scans.py +++ b/bec_lib/bec_lib/scans.py @@ -282,7 +282,8 @@ class ScanDef(ContextDecorator): return self def __exit__(self, *exc): - self.parent.close_scan_def() + if exc[0] is None: + self.parent.close_scan_def() self.parent._scan_def_id = None diff --git a/scan_server/scan_server/scan_queue.py b/scan_server/scan_server/scan_queue.py index d0a541f8..23880cf3 100644 --- a/scan_server/scan_server/scan_queue.py +++ b/scan_server/scan_server/scan_queue.py @@ -8,11 +8,10 @@ import traceback import uuid from enum import Enum +from bec_lib import Alarms, MessageEndpoints, bec_logger, messages, threadlocked from rich.console import Console from rich.table import Table -from bec_lib import Alarms, MessageEndpoints, bec_logger, messages, threadlocked - from .errors import LimitError, ScanAbortion from .scan_assembler import ScanAssembler from .scans import ScanBase @@ -94,9 +93,7 @@ class QueueManager: def _start_scan_queue_consumer(self) -> None: self._scan_queue_consumer = self.connector.consumer( - MessageEndpoints.scan_queue_insert(), - cb=self._scan_queue_callback, - parent=self, + MessageEndpoints.scan_queue_insert(), cb=self._scan_queue_callback, parent=self ) self._scan_queue_modification_consumer = self.connector.consumer( MessageEndpoints.scan_queue_modification(), @@ -622,6 +619,8 @@ class RequestBlockQueue: def increase_scan_number(self) -> None: """increase the scan number counter""" rbl = self.active_rb + if rbl is None: + return if not rbl.is_scan and rbl.scan_def_id is None: return if rbl.scan_def_id is None or rbl.msg.content["scan_type"] == "close_scan_def": @@ -745,6 +744,7 @@ class InstructionQueueItem: def abort(self) -> None: """abort and clear all the instructions from the instruction queue""" self.instructions = iter([]) + # self.queue.request_blocks_queue.clear() def append_scan_request(self, msg): """append a scan message to the instruction queue""" @@ -826,14 +826,16 @@ class InstructionQueueItem: except StopIteration: if not self.scan_macros_complete: logger.info( - f"Waiting for new instructions or scan macro to be closed (scan def ids: {self.queue.scan_def_ids})" + "Waiting for new instructions or scan macro to be closed (scan def ids:" + f" {self.queue.scan_def_ids})" ) time.sleep(0.1) elif self.queue_group is not None and not self.queue_group_is_closed: self.queue.active_rb = None self.parent.queue_manager.send_queue_status() logger.info( - f"Waiting for new instructions or queue group to be closed (group id: {self.queue_group})" + "Waiting for new instructions or queue group to be closed (group id:" + f" {self.queue_group})" ) time.sleep(0.1) else: diff --git a/scan_server/scan_server/scan_worker.py b/scan_server/scan_server/scan_worker.py index 0be56c27..7adaff35 100644 --- a/scan_server/scan_server/scan_worker.py +++ b/scan_server/scan_server/scan_worker.py @@ -694,9 +694,9 @@ class ScanWorker(threading.Thread): queue.is_active = True try: for instr in queue: + self._check_for_interruption() if instr is None: continue - self._check_for_interruption() self._exposure_time = getattr(queue.active_request_block.scan, "exp_time", None) self._instruction_step(instr) except ScanAbortion as exc: @@ -749,6 +749,8 @@ class ScanWorker(threading.Thread): self.open_scan(instr) elif action == "close_scan" and scan_def_id is None: self.close_scan(instr, self.max_point_id) + elif action == "close_scan" and scan_def_id is not None: + pass elif action == "open_scan_def": pass elif action == "close_scan_def":