Add new logging method which writes to special data file to phoenix.PhoenixBL

This commit is contained in:
gac-x07mb
2024-08-27 17:49:27 +02:00
parent d815c24e27
commit f3f51d0a05
8 changed files with 197 additions and 661 deletions

View File

@ -1,345 +0,0 @@
from bec_lib import bec_logger
from ophyd import Component
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
DDGCustomMixin,
PSIDelayGeneratorBase,
TriggerSource,
)
from ophyd_devices.utils import bec_utils
logger = bec_logger.logger
class DelayGeneratorError(Exception):
"""Exception raised for errors."""
class DDGSetup(DDGCustomMixin):
"""
Mixin class for DelayGenerator logic at cSAXS.
At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
implemented in the ddg_config signals that are passed via the device config.
"""
def initialize_default_parameter(self) -> None:
"""Method to initialize default parameters."""
for ii, channel in enumerate(self.parent.all_channels):
self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
self.parent.set_channels("amplitude", self.parent.amplitude.get())
self.parent.set_channels("offset", self.parent.offset.get())
# Setup reference
self.parent.set_channels(
"reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
)
self.parent.set_channels(
"reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
)
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# Set threshold level for ext. pulses
self.parent.level.put(self.parent.thres_trig_level.get())
def prepare_ddg(self) -> None:
"""
Method to prepare scan logic of cSAXS
Two scantypes are supported: "step" and "fly":
- step: Scan is performed by stepping the motor and acquiring data at each step
- fly: Scan is performed by moving the motor with a constant velocity and acquiring data
Custom logic for different DDG behaviour during scans.
- set_high_on_exposure : If True, then TTL signal is high during
the full exposure time of the scan (all frames).
E.g. Keep shutter open for the full scan.
- fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
If the value is 0, then the width of the TTL pulse is determined,
no matter which parameters are passed from the scaninfo for exposure time
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
were: SINGLE_SHOT, EXT_RISING_EDGE
"""
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "step"
if self.parent.scaninfo.scan_type == "step":
# High on exposure means that the signal
if self.parent.set_high_on_exposure.get():
# caluculate parameters
num_burst_cycle = 1 + self.parent.additional_triggers.get()
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.frames_per_trigger
* (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
# scantype "fly"
elif self.parent.scaninfo.scan_type == "fly":
if self.parent.set_high_on_exposure.get():
# caluculate parameters
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
+ self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
else:
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# Set common DDG parameters
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
self.parent.set_channels("delay", 0.0)
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
def check_scan_id(self) -> None:
"""
Method to check if scan_id has changed.
If yes, then it changes parent.stopped to True, which will stop further actions.
"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def finished(self) -> None:
"""Method checks if DDG finished acquisition"""
def on_pre_scan(self) -> None:
"""
Method called by pre_scan hook in parent class.
Executes trigger if premove_trigger is Trus.
"""
if self.parent.premove_trigger.get() is True:
self.parent.trigger_shot.put(1)
class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
"""
DG645 delay generator at cSAXS (multiple can be in use depending on the setup)
Default values for setting up DDG.
Note: checks of set calues are not (only partially) included, check manual for details on possible settings.
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
- delay_burst : (float >=0) Delay between trigger and first pulse in burst mode
- delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition
- additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)
- polarity : (list of 0/1) polarity for different channels
- amplitude : (float) amplitude voltage of TTLs
- offset : (float) offset for ampltitude
- thres_trig_level : (float) threshold of trigger amplitude
Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):
- set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.
# TODO trigger_width and fixed_ttl could be combined into single list.
- fixed_ttl_width : (list of either 1 or 0), one for each channel.
- trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.
- set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.
- premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).
- set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.
"""
custom_prepare_cls = DDGSetup
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
delta_width = Component(
bec_utils.ConfigSignal, name="delta_width", kind="config", config_storage_name="ddg_config"
)
additional_triggers = Component(
bec_utils.ConfigSignal,
name="additional_triggers",
kind="config",
config_storage_name="ddg_config",
)
polarity = Component(
bec_utils.ConfigSignal, name="polarity", kind="config", config_storage_name="ddg_config"
)
fixed_ttl_width = Component(
bec_utils.ConfigSignal,
name="fixed_ttl_width",
kind="config",
config_storage_name="ddg_config",
)
amplitude = Component(
bec_utils.ConfigSignal, name="amplitude", kind="config", config_storage_name="ddg_config"
)
offset = Component(
bec_utils.ConfigSignal, name="offset", kind="config", config_storage_name="ddg_config"
)
thres_trig_level = Component(
bec_utils.ConfigSignal,
name="thres_trig_level",
kind="config",
config_storage_name="ddg_config",
)
set_high_on_exposure = Component(
bec_utils.ConfigSignal,
name="set_high_on_exposure",
kind="config",
config_storage_name="ddg_config",
)
set_high_on_stage = Component(
bec_utils.ConfigSignal,
name="set_high_on_stage",
kind="config",
config_storage_name="ddg_config",
)
set_trigger_source = Component(
bec_utils.ConfigSignal,
name="set_trigger_source",
kind="config",
config_storage_name="ddg_config",
)
trigger_width = Component(
bec_utils.ConfigSignal,
name="trigger_width",
kind="config",
config_storage_name="ddg_config",
)
premove_trigger = Component(
bec_utils.ConfigSignal,
name="premove_trigger",
kind="config",
config_storage_name="ddg_config",
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
ddg_config=None,
**kwargs,
):
"""
Args:
prefix (str, optional): Prefix of the device. Defaults to "".
name (str): Name of the device.
kind (str, optional): Kind of the device. Defaults to None.
read_attrs (list, optional): List of attributes to read. Defaults to None.
configuration_attrs (list, optional): List of attributes to configure. Defaults to None.
parent (Device, optional): Parent device. Defaults to None.
device_manager (DeviceManagerBase, optional): DeviceManagerBase object. Defaults to None.
sim_mode (bool, optional): Simulation mode flag. Defaults to False.
ddg_config (dict, optional): Dictionary of ddg_config signals. Defaults to None.
"""
# Default values for ddg_config signals
self.ddg_config = {
# Setup default values
f"{name}_delay_burst": 0,
f"{name}_delta_width": 0,
f"{name}_additional_triggers": 0,
f"{name}_polarity": [1, 1, 1, 1, 1],
f"{name}_amplitude": 4.5,
f"{name}_offset": 0,
f"{name}_thres_trig_level": 2.5,
# Values for different behaviour during scans
f"{name}_fixed_ttl_width": [0, 0, 0, 0, 0],
f"{name}_trigger_width": None,
f"{name}_set_high_on_exposure": False,
f"{name}_set_high_on_stage": False,
f"{name}_set_trigger_source": "SINGLE_SHOT",
f"{name}_premove_trigger": False,
}
if ddg_config is not None:
# pylint: disable=expression-not-assigned
[self.ddg_config.update({f"{name}_{key}": value}) for key, value in ddg_config.items()]
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True)

View File

@ -26,26 +26,31 @@ from ophyd import Component as Cpt
from ophyd import FormattedComponent as FCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger
class LogTime():
#class LogTime():
def __init__(self):
self.t0=time.process_time()
# def __init__(self):
# self.t0=time.time()
def p_s(self,x):
now=time.process_time()
delta=now-self.t0
m=str(delta)+' sec '+x
logger.success(m)
self.t0=now
# def p_s(self,x):
# now=time.time()
# #delta=now-self.t0
# m=str(now)+' sec '+x
# logger.success(m)
# #self.t0=now
# file=open('MyLogfile.txt','a')
# file.write(m+'\n')
# file.close
ll=LogTime()
p_s=PhoenixBL.my_log
class DetectorInitError(Exception):
"""Raised when initiation of the device class fails,
due to missing device manager or not started in sim_mode."""
@ -211,7 +216,8 @@ class SetupDummy(CustomDetectorMixin):
Args:
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
timeout (float): timeout in seconds
check_stopped (bool): True if stopped flag should be checked
check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value
rue if stopped flag should be checked
interval (float): interval in seconds
all_signals (bool): True if all signals should be True, False if any signal should be True
exception_on_timeout (Exception): Exception to raise on timeout
@ -311,8 +317,13 @@ class Dummy_PSIDetector(PSIDetectorBase):
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
self.p_s=PhoenixBL.my_log #must be before super!!!
self.p_s('Dummy_device Dummy_PSIDetector.__init__ ')
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
ll.p_s('Dummy_device Dummy_PSIDetector.__init__ ')
self.stopped = False
self.name = name
self.service_cfg = None
@ -334,24 +345,28 @@ class Dummy_PSIDetector(PSIDetectorBase):
self._update_scaninfo()
self._update_filewriter()
self._init()
ll.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ')
#.. prepare my own log file
self.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ')
def _update_filewriter(self) -> None:
"""Update filewriter with service config"""
ll.p_s('Dummy_device Dummy_PSIDetector._update_filewriter')
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter')
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
ll.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ')
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ')
def _update_scaninfo(self) -> None:
"""Update scaninfo from BecScaninfoMixing
This depends on device manager and operation/sim_mode
"""
ll.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo')
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo')
self.scaninfo = BecScaninfoMixin(self.device_manager)
self.scaninfo.load_scan_metadata()
ll.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ')
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ')
def _update_service_config(self) -> None:
"""Update service config from BEC service config
@ -361,31 +376,31 @@ class Dummy_PSIDetector(PSIDetectorBase):
# pylint: disable=import-outside-toplevel
from bec_lib.bec_service import SERVICE_CONFIG
ll.p_s('Dummy_device Dummy_PSIDetector._update_service_config')
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config')
if SERVICE_CONFIG:
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
return
self.service_cfg = {"base_path": os.path.abspath(".")}
ll.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done')
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done')
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
ll.p_s('Dummy_device Dummy_PSIDetector.check_scan_id')
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id')
old_scan_id = self.scaninfo.scan_id
self.scaninfo.load_scan_metadata()
if self.scaninfo.scan_id != old_scan_id:
self.stopped = True
ll.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ')
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ')
def _init(self) -> None:
"""Initialize detector, filewriter and set default parameters"""
ll.p_s('Dummy_device Dummy_PSIDetector._init')
self.p_s('Dummy_device Dummy_PSIDetector._init')
self.custom_prepare.on_init()
ll.p_s('Dummy_device Dummy_PSIDetector._init ... done ')
self.p_s('Dummy_device Dummy_PSIDetector._init ... done ')
def stage(self) -> list[object]:
@ -399,14 +414,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
list(object): list of objects that were staged
"""
ll.p_s('Dummy_device Dummy_PSIDetector.stage')
self.p_s('Dummy_device Dummy_PSIDetector.stage')
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.on_stage()
ll.p_s('Dummy_device Dummy_PSIDetector.stage done ')
self.p_s('Dummy_device Dummy_PSIDetector.stage done ')
return super().stage()
@ -418,22 +433,22 @@ class Dummy_PSIDetector(PSIDetectorBase):
time-critical actions. Therefore, it should also be kept as short/fast as possible.
I.e. Arming a detector in case there is a risk of timing out.
"""
ll.p_s('Dummy_device Dummy_PSIDetector.pre_scan')
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan')
self.custom_prepare.on_pre_scan()
ll.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ')
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ')
def trigger(self) -> DeviceStatus:
"""Trigger the detector, called from BEC."""
# pylint: disable=assignment-from-no-return
ll.p_s('Dummy_device Dummy_PSIDetector.trigger')
self.p_s('Dummy_device Dummy_PSIDetector.trigger')
status = self.custom_prepare.on_trigger()
if isinstance(status, DeviceStatus):
return status
ll.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ')
self.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ')
return super().trigger()
@ -446,14 +461,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
"""
# pylint: disable=assignment-from-no-return
ll.p_s('Dummy_device Dummy_PSIDetector.complete')
self.p_s('Dummy_device Dummy_PSIDetector.complete')
status = self.custom_prepare.on_complete()
if isinstance(status, DeviceStatus):
return status
status = DeviceStatus(self)
status.set_finished()
ll.p_s('Dummy_device Dummy_PSIDetector.complete ... done ')
self.p_s('Dummy_device Dummy_PSIDetector.complete ... done ')
return status
@ -469,11 +484,11 @@ class Dummy_PSIDetector(PSIDetectorBase):
Returns:
list(object): list of objects that were unstaged
"""
ll.p_s('Dummy_device Dummy_PSIDetector.unstage')
self.p_s('Dummy_device Dummy_PSIDetector.unstage')
self.check_scan_id()
self.custom_prepare.on_unstage()
self.stopped = False
ll.p_s('Dummy_device Dummy_PSIDetector.unstage .. done')
self.p_s('Dummy_device Dummy_PSIDetector.unstage .. done')
return super().unstage()
@ -482,9 +497,9 @@ class Dummy_PSIDetector(PSIDetectorBase):
Stop the scan, with camera and file writer
"""
ll.p_s('Dummy_device Dummy_PSIDetector.stop')
self.p_s('Dummy_device Dummy_PSIDetector.stop')
self.custom_prepare.on_stop()
super().stop(success=success)
self.stopped = True
ll.p_s('Dummy_device Dummy_PSIDetector.stop ... done')
self.p_s('Dummy_device Dummy_PSIDetector.stop ... done')

View File

@ -1,182 +0,0 @@
from ophyd import (
ADComponent as ADCpt,
Device,
DeviceStatus,
)
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
import time
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off
class PhoenixTriggerSetup(CustomDetectorMixin):
"""
This defines the PHOENIX trigger setup.
"""
def __init__(self, *args, parent:Device = None, **kwargs):
super().__init__(*args, parent=parent, **kwargs)
self._counter = 0
WW
def on_stage(self):
# is this called on each point in scan or just before scan ???
print('on stage')
self.parent.start_csmpl.put(0)
time.sleep(0.05)
cycles=self.parent.total_cycles.get()
time.sleep(0.05)
cycles=self.parent.total_cycles.put(0)
time.sleep(0.05)
cycles=self.parent.smpl.put(2)
time.sleep(0.5)
cycles=self.parent.total_cycles.put(cycles)
logger.success('PhoenixTrigger on stage')
def on_trigger(self):
self.parent.start_smpl.put(1)
time.sleep(0.05) # use blocking
logger.success('PhoenixTrigger on_trigger')
return self.wait_with_status(
[(self.parent.smpl_done.get, 1)])
# logger.success(' PhoenixTrigger on_trigger complete ')
# if success:
# status.set_finished()
# else:
# status.set_exception(TimeoutError())
# return status
def on_complete(self):
timeout =10
logger.success('XXXX complete %d XXXX' % success)
success = self.wait_for_signals(
[
(self.parent.smpl_done.get, 0))
],
timeout,
check_stopped=True,
all_signals=True
)
if success:
status.set_finished()
else:
status.set_exception(TimeoutError())
return status
def on_stop(self):
logger.success(' PhoenixTrigger on_stop ')
self.parent.csmpl.put(1)
logger.success(' PhoenixTrigger on_stop finished ')
def on_unstage(self):
logger.success(' PhoenixTrigger on_unstage ')
self.parent.csmpl.put(1)
self.parent.smpl.put(1)
logger.success(' PhoenixTrigger on_unstage finished ')
class PhoenixTrigger(PSIDetectorBase):
"""
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
in __init__ of PSIDetecor bases
class is initialized
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector
mca (EpicsMCARecord) : MCA parameters for XMAP detector
hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector
MIN_READOUT (float) : Minimum readout time for the detector
The class PhoenixTrigger is the class to be called via yaml configuration file
the input arguments are defined by PSIDetectorBase,
and need to be given in the yaml configuration file.
To adress chanels such as 'X07MB-OP2:SMPL-DONE':
use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file.
PSIDetectorBase(
prefix='',
*,Q
name,
kind=None,
parent=None,
device_manager=None,
**kwargs,
)
Docstring:
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py
Type: type
Subclasses: EpicsSignal
"""
custom_prepare_cls = PhoenixTriggerSetup
start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off
intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up
total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set
smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done

View File

@ -1,81 +1,81 @@
#from unittest import mock
import numpy as np
#import pandas
#import pytest
#from bec_lib import messages
#import device_server
#from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import FormattedComponent as FCpt
#from ophyd import Kind, PVPositioner, Signal
#from ophyd.flyers import FlyerInterface
#from ophyd.pv_positioner import PVPositionerComparator
#from ophyd.status import DeviceStatus, SubscriptionStatus
#from unittest import mock
import numpy as np
#import pandas
#import pytest
#from bec_lib import messages
#import device_server
#from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
#from ophyd import FormattedComponent as FCpt
#from ophyd import Kind, PVPositioner, Signal
#from ophyd.flyers import FlyerInterface
#from ophyd.pv_positioner import PVPositionerComparator
#from ophyd.status import DeviceStatus, SubscriptionStatus
import time as tt
#import ophyd
import os
import sys
#logger = bec_logger.logger
# load simulation
#bec.config.load_demo_config()
bec.config.update_session_with_file("config/config_1.yaml")
os.system('mv *.yaml tmp')
import time as tt
#import ophyd
import os
import sys
#logger = bec_logger.logger
# load simulation
#bec.config.load_demo_config()
bec.config.update_session_with_file("config/config_1.yaml")
os.system('mv *.yaml tmp')
class PhoenixBL:
#define some epics channels
def __init__(self):
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import Component as Cpt
self.ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
self.ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY')
self.DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN')
self.SIG = Cpt(EpicsSignal,name='we',read_pv="X07MB-OP2-SAI_07:MEAN")
self.SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL')
self.CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES')
self.fielda =EpicsSignal(name='SMPL',read_pv='X07MB-SCAN:scan1.P1SP',write_pv='X07MB-SCAN:scan1.P1SP')
#end class
ph=PhoenixBL()
print('---------------------------------')
# scan will not diode
print(' SCAN DO NOT READ DIODE ')
dev.PH_curr_conf.readout_priority='baseline' # do not read detector
ti=tt.time_ns()
s1=scans.line_scan(dev.PH_ScanX_conf,0,0.002,steps=4,exp_time=.01,relative=False,delay=2)
tf=tt.time_ns()
print('elapsed time',(tf-ti)/1e9)
# scan will read diode
print(' SCAN READ DIODE ')
tt.sleep(2)
dev.PH_curr_conf.readout_priority='monitored' # read detector
s2=scans.line_scan(dev.PH_ScanX_conf,0,0.002,steps=11,exp_time=.3,relative=False,delay=2)
class PhoenixBL:
#define some epics channels
def __init__(self):
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import Component as Cpt
self.ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
self.ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY')
self.DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN')
self.SIG = Cpt(EpicsSignal,name='we',read_pv="X07MB-OP2-SAI_07:MEAN")
self.SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL')
self.CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES')
self.fielda =EpicsSignal(name='SMPL',read_pv='X07MB-SCAN:scan1.P1SP',write_pv='X07MB-SCAN:scan1.P1SP')
#end class
ph=PhoenixBL()
print('---------------------------------')
# scan will not diode
print(' SCAN DO NOT READ DIODE ')
dev.PH_curr_conf.readout_priority='baseline' # do not read detector
ti=tt.time_ns()
s1=scans.line_scan(dev.PH_ScanX_conf,0,0.002,steps=4,exp_time=.01,relative=False,delay=2)
tf=tt.time_ns()
print('elapsed time',(tf-ti)/1e9)
# scan will read diode
print(' SCAN READ DIODE ')
tt.sleep(2)
dev.PH_curr_conf.readout_priority='monitored' # read detector
s2=scans.line_scan(dev.PH_ScanX_conf,0,0.002,steps=11,exp_time=.3,relative=False,delay=2)
"""
next lines do not work as pandas is not installed on test system
next lines do not work as pandas is not installed on test system
res1 = s1.scan.to_pandas()
re1 = res1.to_numpy()
print('Scana')
print(res1)
print('')
print('Scan2 at pandas ')
print(res2)
print('Scan2 as numpy ')
res1 = s1.scan.to_pandas()
re1 = res1.to_numpy()
print('Scana')
print(res1)
print('')
print('Scan2 at pandas ')
print(res2)
print('Scan2 as numpy ')
print(res2)
"""
@ -83,4 +83,3 @@ print(res2)

View File

@ -28,7 +28,7 @@ time.sleep(1)
s1=scans.line_scan(dev.ScanX,0,0.1,steps=4,exp_time=.2,relative=False,delay=2)
s2=scans.phoenix_line_scan(dev.ScanX,0,0.002,steps=4,exp_time=.2,relative=False,delay=2)
s2=scans.phoenix_line_scan(dev.ScanX,0,0.1,steps=4,exp_time=.2,relative=False,delay=2)
res1 = s1.scan.to_pandas()
re1 = res1.to_numpy()

View File

@ -1,3 +1,3 @@
import phoenix_bec.scripts.phoenix as PH
w=PH.PhGroup('labelName')
w.linescan2group(s1)
w.linescan2group(s1)

View File

@ -61,6 +61,8 @@ from bec_server.scan_server.scans import ScanBase, ScanArgType
import numpy as np
import time
from bec_lib.logger import bec_logger
from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger
@ -68,16 +70,21 @@ logger = bec_logger.logger
class LogTime():
def __init__(self):
self.t0=time.process_time()
logger.success('init LogTime')
self.t0=time.time()
def p_s(self,x):
now=time.process_time()
delta=now-self.t0
m=str(delta)+' sec '+x
now=time.time()
#delta=now-self.t0
m=str(now)+' sec '+x
logger.success(m)
self.t0=now
#self.t0=now
file=open('MyLogfile.txt','a')
file.write(m+'\n')
file.close
ll=LogTime()
class PhoenixScanBaseTTL(ScanBase):
@ -86,19 +93,19 @@ class PhoenixScanBaseTTL(ScanBase):
"""
ll.p_s('enter scripts.phoenix.scans.PhoenixScanBaseTTL')
def scan_core(self):
"""perform the scan core procedure"""
ll.p_s('PhoenixScanBaseTT.scan_core')
self.p_s('PhoenixScanBaseTT.scan_core')
for ind, pos in self._get_position():
for self.burst_index in range(self.burst_at_each_point):
ll.p_s('PhoenixScanBaseTT.scan_core in loop ')
self.p_s('PhoenixScanBaseTT.scan_core in loop ')
yield from self._at_each_point(ind, pos)
self.burst_index = 0
def _at_each_point(self, ind=None, pos=None):
ll.p_s('PhoenixScanBaseTT._at_each_point')
self.p_s('PhoenixScanBaseTT._at_each_point')
yield from self._move_scan_motors_and_wait(pos)
if ind > 0:
yield from self.stubs.wait(
@ -115,11 +122,12 @@ class PhoenixScanBaseTTL(ScanBase):
)
self.point_id += 1
ll.p_s('done')
self.p_s('done')
class PhoenixLineScan(PhoenixScanBaseTTL):
ll.p_s('enter scripts.phoenix.scans.PhoenixLineScan')
scan_name = "phoenix_line_scan"
required_kwargs = ["steps", "relative"]
arg_input = {
@ -155,7 +163,11 @@ class PhoenixLineScan(PhoenixScanBaseTTL):
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
"""
ll.p_s('init scripts.phoenix.scans.PhoenixLineScan')
#from phoenix_bec.scripts.phoenix import PhoenixBL
self.p_s=PhoenixBL.my_log
self.p_s('init scripts.phoenix.scans.PhoenixLineScan')
super().__init__(
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
)
@ -163,14 +175,14 @@ ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, rela
self.setup_device = setup_device
time.sleep(1)
ll.p_s('done')
self.p_s('done')
def _calculate_positions(self) -> None:
ll.p_s('PhoenixLineScan._calculate_positions')
self.p_s('PhoenixLineScan._calculate_positions')
axis = []
for _, val in self.caller_args.items():
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
axis.append(ax_pos)
self.positions = np.array(list(zip(*axis)), dtype=float)
ll.p_s('done')
self.p_s('done')

View File

@ -1,7 +1,7 @@
#from unittest import mock
import os
import sys
import time as tt
import time
import numpy as np
#import pandas
@ -37,6 +37,7 @@ class PhoenixBL():
# General class for PHOENIX beamline located in phoenix_bec/phoenic_bec/scripts
#
"""
t0=time.time()
def __init__(self):
"""
init PhoenixBL() in phoenix_bec/scripts
@ -56,6 +57,8 @@ class PhoenixBL():
self.path_phoenix_bec ='/data/test/x07mb-test-bec/bec_deployment/phoenix_bec/'
self.path_devices = self.path_phoenix_bec + 'phoenix_bec/device_configs/' # local yamal file
self.file_devices_file = self.path_phoenix_bec + 'phoenix_bec/device_configs/phoenix_devices.yaml' # local yamal file
self.t0=time.time()
def read_local_phoenix_config(self):
print('read file ')
@ -83,6 +86,26 @@ class PhoenixBL():
print(self.path_phoenix_bec)
os.system('cat '+self.path_phoenix_bec+'phoenix_bec/scripts/Current_setup.txt')
@classmethod
def my_log(cls,x):
"""
class method allows to write a user defined log file
time is seconds relative to some point max 10 minutes ago
"""
print(time.time())
now = time.time() - (86400*(time.time()//86400))
now = now - 3600.*(now//3600.)
now = now - 600.*(now//600.)
m=str(now)+' sec '+x
logger.success(m)
file=open('MyLogfile.txt','a')
file.write(m+'\n')
file.close
@ -90,23 +113,24 @@ class PhoenixBL():
class PhGroup():
"""
Class to create data groups
with attributes prvidws as string
compatible with larch groups
call by
initialize by
ww=MakeGroup('YourName')
ww=PhGroup('YourLabel')
it creates a group
with default attributes
ww.GroupName='YourName'
ww.label = 'YourLabel' --- for compatibility with larch groups
ww.description =YourLabel'
To add further data use for example by
Further data can be added with new tags by
ww.newtag=67
or use meth
ww.keys() -- list all keys
ww.linescan2group -- converts bec linescan data to group format
"""
@ -169,6 +193,17 @@ class PhGroup():
def linescan2group(self,this_scan):
"""
method merges results of linescan into group and
creates for each data a numpy variable constructed as
group_name.{device_name}_{variable_name}_val (for value )
group_name.{device_name}_{variable_name}_ts (for timestamp )
"""
print('keys')
print(this_scan.scan.data.keys())
for outer_key in this_scan.scan.data.keys():
@ -192,6 +227,8 @@ class PhGroup():
#endfor
self.add(inner_key+'_'+ outer_key+'_val',value)
self.add(inner_key+'_'+ outer_key+'_ts',timestamp)
#endfor
#endfor
#endfor
#enddef