Delayed start on Aerotech tasks

This commit is contained in:
gac-x05la
2025-02-14 13:48:47 +01:00
parent b8dcda1696
commit d1e072b8d9

View File

@@ -5,7 +5,6 @@ interface.
@author: mohacsi_i
"""
from time import sleep
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
@@ -19,8 +18,8 @@ logger = bec_logger.logger
class AerotechTasksMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
"""
"""Mixin class for self-configuration and staging"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
@@ -33,8 +32,6 @@ class AerotechTasksMixin(CustomDeviceMixin):
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
@@ -121,15 +118,15 @@ class aa1Tasks(PSIDeviceBase):
self.switch.set("Reset").wait()
# Check what we got
if "script_task" in d:
if d['script_task'] < 3 or d['script_task'] > 21:
if d["script_task"] < 3 or d["script_task"] > 21:
raise RuntimeError(f"Invalid task index: {d['script_task']}")
self.taskIndex.set(d['script_task']).wait()
self.taskIndex.set(d["script_task"]).wait()
if "script_file" in d:
self.fileName.set(d["script_file"]).wait()
if "script_text" in d:
# Compile text for syntax checking
# NOTE: This will load to 'script_file'
self._fileWrite.set(d['script_text'], settle_time=0.2).wait()
self._fileWrite.set(d["script_text"], settle_time=0.2).wait()
self.switch.set("Load").wait()
# Check the result of load
if self._failure.value:
@@ -139,14 +136,23 @@ class aa1Tasks(PSIDeviceBase):
return (old, new)
def bluestage(self) -> None:
"""Bluesky style stage"""
"""Bluesky style stage, prepare, but does not execute"""
if self.taskIndex.get() in (0, 1, 2):
logger.error(f"[{self.name}] Launching AeroScript on system task. Daring today are we?")
# Launch and check success
status = self.switch.set("Run", settle_time=0.2)
logger.error(f"[{self.name}] Loading AeroScript on system task. Daring today are we?")
# Load and check success
status = self.switch.set("Load", settle_time=0.2)
status.wait()
if self._failure.value:
raise RuntimeError("Failed to kick off task, please check the Aerotech IOC")
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status
def bluekickoff(self):
"""Bluesky style kickoff"""
# Launch and check success
status = self.switch.set("Start", settle_time=0.2)
status.wait()
if self._failure.value:
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status
##########################################################################
@@ -156,7 +162,7 @@ class aa1Tasks(PSIDeviceBase):
timestamp_ = 0
task_idx = int(self.taskIndex.get())
def not_running(*args, value, timestamp, **kwargs):
def not_running(*, value, timestamp, **_):
nonlocal timestamp_
result = value[task_idx] not in ["Running", 4]
timestamp_ = timestamp