re-wrote _update_available_scans; naming

This commit is contained in:
augustin_s 2022-06-30 17:49:17 +02:00
parent 4804bb8b75
commit d24d3cb397

View File

@ -7,10 +7,10 @@ import msgpack
from bec_utils import BECMessage, MessageEndpoints
from bec_utils.connector import ConnectorBase
import koss.scans as kossScans
import koss.scans as koss_scans
from .bkqueue import QueueManager
from .devicemanager import DeviceManagerKOSS
from .devicemanager import DeviceManagerKOSS as DeviceManager #TODO rename original?
from .scan_assembler import ScanAssembler
from .scan_guard import ScanGuard
from .scan_worker import ScanWorker
@ -19,7 +19,8 @@ logger = logging.getLogger(__name__)
class KOSS:
dm = None
device_manager = None
queue_manager = None
scan_guard = None
scan_server = None
scan_assembler = None
@ -33,23 +34,23 @@ class KOSS:
self.producer = self.connector.producer()
self._update_available_scans()
self._start_queue_manager()
self._start_devicemanager()
self._start_device_manager()
self._start_scan_guard()
self._start_scan_assembler()
self._start_scan_server()
self._publish_available_scans()
self._start_alarm_handler()
def _start_devicemanager(self):
self.dm = DeviceManagerKOSS(self.connector, self.scibec_url)
self.dm.initialize([self.bootstrap_server])
def _start_device_manager(self):
self.device_manager = DeviceManager(self.connector, self.scibec_url)
self.device_manager.initialize([self.bootstrap_server])
def _start_scan_server(self):
self.scan_worker = ScanWorker(parent=self)
self.scan_worker.start()
def _start_queue_manager(self):
self.qm = QueueManager(parent=self)
self.queue_manager = QueueManager(parent=self)
def _start_scan_assembler(self):
self.scan_assembler = ScanAssembler(parent=self)
@ -57,26 +58,26 @@ class KOSS:
def _start_scan_guard(self):
self.scan_guard = ScanGuard(parent=self)
def _update_available_scans(self):
for name, val in inspect.getmembers(kossScans):
for name, val in inspect.getmembers(koss_scans): #TODO: use vars() ?
try:
if issubclass(val, kossScans.RequestBase):
if val.scan_name == "":
logger.debug(f"Ignoring {name}")
self.scan_dict[val.scan_name] = {
"class": val.__name__,
"arg_input": val.arg_input,
"required_kwargs": val.required_kwargs,
"scan_report_hint": val.scan_report_hint,
}
doc = None
if val.__doc__ is not None:
doc = val.__doc__
elif val.__init__ is not None:
doc = val.__init__.__doc__
self.scan_dict[val.scan_name]["doc"] = doc
is_scan = issubclass(val, koss_scans.RequestBase)
except TypeError:
is_scan = False
if not is_scan or not val.scan_name:
logger.debug(f"Ignoring {name}")
continue
self.scan_dict[val.scan_name] = {
"class": val.__name__,
"arg_input": val.arg_input,
"required_kwargs": val.required_kwargs,
"scan_report_hint": val.scan_report_hint,
"doc": val.__doc__ or val.__init__.__doc__,
}
def _publish_available_scans(self):
self.producer.set(MessageEndpoints.available_scans(), msgpack.dumps(self.scan_dict))
@ -91,15 +92,18 @@ class KOSS:
@staticmethod
def _alarm_callback(msg, parent: KOSS, **_kwargs):
msg = BECMessage.AlarmMessage.loads(msg.value)
if "scanID" in msg.metadata:
parent.qm._set_abort(scanID=msg.metadata["scanID"], queue=msg.metadata["stream"])
md = BECMessage.AlarmMessage.loads(msg.value).metadata
scanID = md.get("scanID")
queue = md.get("stream")
if scanID and queue:
parent.queue_manager._set_abort(scanID=scanID, queue=queue)
def load_config_from_disk(self, file_path):
self.dm.load_config_from_disk(file_path)
self.device_manager.load_config_from_disk(file_path)
def shutdown(self):
self.dm.shutdown()
self.qm.shutdown()
self.scan_worker.signal_event.set()
self.device_manager.shutdown()
self.queue_manager.shutdown()
self.scan_worker.signal_event.set() #TODO: this should be a shutdown method, too
self.scan_worker.join()