mirror of
https://github.com/ivan-usov-org/bec.git
synced 2025-04-22 02:20:02 +02:00
fix(scan_server): restart queue if worker died
This commit is contained in:
parent
1fa372b590
commit
a59eb9c888
@ -87,9 +87,16 @@ class QueueManager:
|
||||
@threadlocked
|
||||
def add_queue(self, queue_name: str) -> None:
|
||||
"""add a new queue to the queue manager"""
|
||||
if queue_name not in self.queues:
|
||||
self.queues[queue_name] = ScanQueue(self, queue_name=queue_name)
|
||||
self.queues[queue_name].start_worker()
|
||||
if queue_name in self.queues:
|
||||
queue = self.queues[queue_name]
|
||||
if not queue.scan_worker.is_alive():
|
||||
logger.info(f"Restarting worker for queue {queue_name}")
|
||||
queue.clear()
|
||||
self.queues[queue_name] = ScanQueue(self, queue_name=queue_name)
|
||||
self.queues[queue_name].start_worker()
|
||||
return
|
||||
self.queues[queue_name] = ScanQueue(self, queue_name=queue_name)
|
||||
self.queues[queue_name].start_worker()
|
||||
|
||||
def _start_scan_queue_register(self) -> None:
|
||||
self.connector.register(
|
||||
|
@ -894,8 +894,6 @@ class ScanWorker(threading.Thread):
|
||||
self.parent.queue_manager.queues[self.queue_name].abort()
|
||||
self.reset()
|
||||
logger.error(f"Scan worker stopped: {exc}. Unrecoverable error.")
|
||||
finally:
|
||||
self.connector.shutdown()
|
||||
|
||||
def shutdown(self):
|
||||
"""shutdown the scan worker"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user