fix: remove copy/paste bug
This commit is contained in:
@ -23,90 +23,87 @@ but they are executed in a specific order:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# imports in ScanBase
|
# imports in ScanBase
|
||||||
#from __future__ import annotations
|
# from __future__ import annotations
|
||||||
|
|
||||||
#import ast
|
# import ast
|
||||||
#import enum
|
# import enum
|
||||||
#import threading
|
# import threading
|
||||||
#import time
|
# import time
|
||||||
#import uuid
|
# import uuid
|
||||||
#from abc import ABC, abstractmethod
|
# from abc import ABC, abstractmethod
|
||||||
#from typing import Any, Literal
|
# from typing import Any, Literal
|
||||||
|
|
||||||
#import numpy as np
|
# import numpy as np
|
||||||
|
|
||||||
#from bec_lib.device import DeviceBase
|
# from bec_lib.device import DeviceBase
|
||||||
#from bec_lib.devicemanager import DeviceManagerBase
|
# from bec_lib.devicemanager import DeviceManagerBase
|
||||||
#from bec_lib.endpoints import MessageEndpoints
|
# from bec_lib.endpoints import MessageEndpoints
|
||||||
#from bec_lib.logger import bec_logger
|
# from bec_lib.logger import bec_logger
|
||||||
|
|
||||||
#from .errors import LimitError, ScanAbortion
|
# from .errors import LimitError, ScanAbortion
|
||||||
#from .path_optimization import PathOptimizerMixin
|
# from .path_optimization import PathOptimizerMixin
|
||||||
#from .scan_stubs import ScanStubs
|
# from .scan_stubs import ScanStubs
|
||||||
# end imports in ScanBase
|
# end imports in ScanBase
|
||||||
|
|
||||||
# import time
|
# import time
|
||||||
|
|
||||||
# import numpy as np
|
# import numpy as np
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
# from bec_lib.endpoints import MessageEndpoints
|
# from bec_lib.endpoints import MessageEndpoints
|
||||||
from bec_lib.logger import bec_logger
|
from bec_lib.logger import bec_logger
|
||||||
|
from bec_server.scan_server.scans import ScanArgType, ScanBase
|
||||||
|
|
||||||
|
from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||||
|
|
||||||
# from bec_lib import messages
|
# from bec_lib import messages
|
||||||
# from bec_server.scan_server.errors import ScanAbortion
|
# from bec_server.scan_server.errors import ScanAbortion
|
||||||
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
|
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
|
||||||
|
|
||||||
# logger = bec_logger.logger
|
# logger = bec_logger.logger
|
||||||
|
|
||||||
from bec_server.scan_server.scans import ScanBase, ScanArgType
|
|
||||||
import numpy as np
|
|
||||||
import time
|
|
||||||
from bec_lib.logger import bec_logger
|
|
||||||
from phoenix_bec.scripts.phoenix import PhoenixBL
|
|
||||||
|
|
||||||
|
|
||||||
logger = bec_logger.logger
|
logger = bec_logger.logger
|
||||||
|
|
||||||
|
|
||||||
class LogTime():
|
class LogTime:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
logger.success('init LogTime')
|
logger.success("init LogTime")
|
||||||
self.t0=time.time()
|
self.t0 = time.time()
|
||||||
|
|
||||||
def p_s(self,x):
|
def p_s(self, x):
|
||||||
now=time.time()
|
now = time.time()
|
||||||
#delta=now-self.t0
|
# delta=now-self.t0
|
||||||
m=str(now)+' sec '+x
|
m = str(now) + " sec " + x
|
||||||
logger.success(m)custom_prepare_cls(parent=self, **kwargs)
|
logger.success(m)
|
||||||
# making the instance of PSID
|
# making the instance of PSID
|
||||||
#self.t0=now
|
# self.t0=now
|
||||||
file=open('MyLogfile.txt','a')
|
file = open("MyLogfile.txt", "a")
|
||||||
file.write(m+'\n')
|
file.write(m + "\n")
|
||||||
file.close
|
file.close
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PhoenixScanBaseTTL(ScanBase):
|
class PhoenixScanBaseTTL(ScanBase):
|
||||||
"""
|
"""
|
||||||
Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan')
|
Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan')
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def scan_core(self):
|
def scan_core(self):
|
||||||
"""perform the scan core procedure"""
|
"""perform the scan core procedure"""
|
||||||
self.p_s('PhoenixScanBaseTT.scan_core')
|
self.p_s("PhoenixScanBaseTT.scan_core")
|
||||||
for ind, pos in self._get_position():
|
for ind, pos in self._get_position():
|
||||||
for self.burst_index in range(self.burst_at_each_point):
|
for self.burst_index in range(self.burst_at_each_point):
|
||||||
self.p_s('PhoenixScanBaseTT.scan_core in loop ')
|
self.p_s("PhoenixScanBaseTT.scan_core in loop ")
|
||||||
|
|
||||||
yield from self._at_each_point(ind, pos)
|
yield from self._at_each_point(ind, pos)
|
||||||
self.burst_index = 0
|
self.burst_index = 0
|
||||||
|
|
||||||
def _at_each_point(self, ind=None, pos=None):
|
def _at_each_point(self, ind=None, pos=None):
|
||||||
self.p_s('PhoenixScanBaseTT._at_each_point')
|
self.p_s("PhoenixScanBaseTT._at_each_point")
|
||||||
yield from self._move_scan_motors_and_wait(pos)
|
yield from self._move_scan_motors_and_wait(pos)
|
||||||
if ind > 0:
|
if ind > 0:
|
||||||
yield from self.stubs.wait(
|
yield from self.stubs.wait(
|
||||||
@ -123,12 +120,11 @@ class PhoenixScanBaseTTL(ScanBase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.point_id += 1
|
self.point_id += 1
|
||||||
self.p_s('done')
|
self.p_s("done")
|
||||||
|
|
||||||
|
|
||||||
class PhoenixLineScan(PhoenixScanBaseTTL):
|
class PhoenixLineScan(PhoenixScanBaseTTL):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
scan_name = "phoenix_line_scan"
|
scan_name = "phoenix_line_scan"
|
||||||
required_kwargs = ["steps", "relative"]
|
required_kwargs = ["steps", "relative"]
|
||||||
arg_input = {
|
arg_input = {
|
||||||
@ -149,25 +145,25 @@ class PhoenixLineScan(PhoenixScanBaseTTL):
|
|||||||
steps: int = None,
|
steps: int = None,
|
||||||
relative: bool = False,
|
relative: bool = False,
|
||||||
burst_at_each_point: int = 1,
|
burst_at_each_point: int = 1,
|
||||||
setup_device:str= None,
|
setup_device: str = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
A phoenix line scan for one or more motors.
|
A phoenix line scan for one or more motors.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
*args (Device, float, float): pairs of device / start position / end position
|
*args (Device, float, float): pairs of device / start position / end position
|
||||||
exp_time (float): exposure time in s. Default: 0
|
exp_time (float): exposure time in s. Default: 0
|
||||||
steps (int): number of steps. Default: 10
|
steps (int): number of steps. Default: 10
|
||||||
relative (bool): if True, the start and end positions are relative to the current position. Default: False
|
relative (bool): if True, the start and end positions are relative to the current position. Default: False
|
||||||
burst_a Specifies the level of type checking analysis to perform.
|
burst_a Specifies the level of type checking analysis to perform.
|
||||||
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
|
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
#from phoenix_bec.scripts.phoenix import PhoenixBL
|
# from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||||
self.p_s=PhoenixBL.my_log
|
self.p_s = PhoenixBL.my_log
|
||||||
|
|
||||||
self.p_s('init scripts.phoenix.scans.PhoenixLineScan')
|
self.p_s("init scripts.phoenix.scans.PhoenixLineScan")
|
||||||
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
|
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
|
||||||
@ -176,14 +172,13 @@ ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, rela
|
|||||||
self.setup_device = setup_device
|
self.setup_device = setup_device
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.p_s('done')
|
self.p_s("done")
|
||||||
|
|
||||||
def _calculate_positions(self) -> None:
|
def _calculate_positions(self) -> None:
|
||||||
self.p_s('PhoenixLineScan._calculate_positions')
|
self.p_s("PhoenixLineScan._calculate_positions")
|
||||||
axis = []
|
axis = []
|
||||||
for _, val in self.caller_args.items():
|
for _, val in self.caller_args.items():
|
||||||
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
|
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
|
||||||
axis.append(ax_pos)
|
axis.append(ax_pos)
|
||||||
self.positions = np.array(list(zip(*axis)), dtype=float)
|
self.positions = np.array(list(zip(*axis)), dtype=float)
|
||||||
self.p_s('done')
|
self.p_s("done")
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user