From 49da8b32da4ea5e70ec590f266a8c0133526a93f Mon Sep 17 00:00:00 2001 From: mohacsi_i Date: Tue, 27 Feb 2024 15:32:59 +0100 Subject: [PATCH] More cleanup with testing --- .../epics/devices/AerotechAutomation1.py | 2007 +++++++---------- 1 file changed, 845 insertions(+), 1162 deletions(-) diff --git a/ophyd_devices/epics/devices/AerotechAutomation1.py b/ophyd_devices/epics/devices/AerotechAutomation1.py index aa6a255..d683640 100644 --- a/ophyd_devices/epics/devices/AerotechAutomation1.py +++ b/ophyd_devices/epics/devices/AerotechAutomation1.py @@ -1,1213 +1,896 @@ -from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal -from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus -from ophyd.flyers import FlyerInterface -from time import sleep -import warnings -import numpy as np -import time +# -*- coding: utf-8 -*- +""" +Created on Thu Nov 3 15:54:08 2022 -from .AerotechAutomation1Enums import (DataCollectionMode, DataCollectionFrequency, - AxisDataSignal, PsoWindowInput, DriveDataCaptureInput, DriveDataCaptureTrigger, - TaskDataSignal, SystemDataSignal, TomcatSequencerState) +@author: mohacsi_i +""" +import os +os.environ["EPICS_CA_ADDR_LIST"]="129.129.144.255" -from .AerotechAutomation1Enums import * -from typing import Union from collections import OrderedDict +import warnings +import numpy as np +from time import sleep, time +import unittest +from ophyd import EpicsMotor, EpicsSignal, EpicsSignalRO -#class EpicsMotorX(EpicsMotor): -# pass - - -class EpicsMotorX(EpicsMotor): - SUB_PROGRESS = "progress" - - def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - self._startPosition = None - self._targetPosition = None - self.subscribe(self._progress_update, run=False) - - def configure(self, d: dict): - if "target" in d: - self._targetPosition = d['target'] - if "position" in d: - self._targetPosition = d['position'] - #super().configure(d) - - def kickoff(self): - self._startPosition = float( self.position) - return self.move(self._targetPosition, wait=False) - - def _progress_update(self, value, **kwargs) -> None: - """Progress update on the scan""" - if (self._startPosition is None) or (self._targetPosition is None) or (not self.moving): - self._run_subs( sub_type=self.SUB_PROGRESS, value=1, max_value=1, done=1, ) - return - - progress = np.abs( (value-self._startPosition)/(self._targetPosition-self._startPosition) ) - max_value = 100 - self._run_subs( - sub_type=self.SUB_PROGRESS, - value=int(100*progress), max_value=max_value, done=int(np.isclose(max_value, progress, 1e-3)), ) +try: + from AerotechAutomation1 import (aa1Controller, aa1Tasks, aa1TaskState, aa1DataAcquisition, + aa1GlobalVariables, aa1AxisIo, aa1AxisDriveDataCollection, + aa1GlobalVariableBindings, aa1AxisPsoDistance, aa1AxisPsoWindow) + from AerotechAutomation1Enums import AxisDataSignal, SystemDataSignal, DriveDataCaptureInput, DriveDataCaptureTrigger +except Exception: + from .AerotechAutomation1 import (aa1Controller, aa1Tasks, aa1TaskState, aa1DataAcquisition, + aa1GlobalVariables, aa1AxisIo, aa1AxisDriveDataCollection, + aa1GlobalVariableBindings, aa1AxisPsoDistance, aa1AxisPsoWindow) + from .AerotechAutomation1Enums import AxisDataSignal, SystemDataSignal, DriveDataCaptureInput, DriveDataCaptureTrigger +AA1_IOC_NAME = "X02DA-ES1-SMP1" +AA1_AXIS_NAME = "ROTY" +class AerotechAutomation1Test(unittest.TestCase): + _isConnected = False -def epicsCharArray2String(raw: bytes) -> str: - """ Ophyd might convert strings to uint16 vector...""" - if isinstance(raw, np.ndarray): - raw = raw[:-1] if raw[-1]==0 else raw - sReply = ''.join(str(s, encoding='ASCII') for s in raw.astype(np.uint8)) - #print(f"Raw reply: {raw}\nConverted reply: {sReply}") - return sReply - else: - return raw - -class EpicsSignalPassive(Device): - value = Component(EpicsSignalRO, "", kind=Kind.omitted) - proc = Component(EpicsSignal, ".PROC", kind=Kind.omitted, put_complete=True) - def get(self): - self.proc.set(1).wait() - return self.value.get() - - -class EpicsPassiveRO(EpicsSignalRO): - """Special helper class to work around a bug in ophyd (caproto backend) - that reads CHAR array strigs as uint16 arrays. - """ + @classmethod + def setUpClass(cls): + try: + # HIL testing only if there's hardware + dev = aa1Controller(AA1_IOC_NAME+":CTRL:", name="aa1") + dev.wait_for_connection() + cls._isConnected = True + print("\n") + except Exception as ex: + print(ex) - def __init__(self, read_pv, *, string=False, name=None, **kwargs): - super().__init__(read_pv, string=string, name=name, **kwargs) - self._proc = EpicsSignal(read_pv+".PROC", kind=Kind.omitted, put_complete=True) + def test_00_Instantiation(self): + """ Test instantiation and interfaces - def wait_for_connection(self, *args, **kwargs): - super().wait_for_connection(*args, **kwargs) - self._proc.wait_for_connection(*args, **kwargs) - - def get(self, *args, **kwargs): - self._proc.set(1).wait() - return super().get(*args, **kwargs) - - @property - def value(self): - return super().value - - - - -class EpicsStringPassiveRO(EpicsSignalRO): - """Special helper class to work around a bug in ophyd (caproto backend) - that reads CHAR array strigs as uint16 arrays. - """ - - def __init__(self, read_pv, *, string=False, name=None, **kwargs): - super().__init__(read_pv, string=string, name=name, **kwargs) - self._proc = EpicsSignal(read_pv+".PROC", kind=Kind.omitted, put_complete=True) - - def wait_for_connection(self, *args, **kwargs): - super().wait_for_connection(*args, **kwargs) - self._proc.wait_for_connection(*args, **kwargs) - - def get(self, *args, **kwargs): - self._proc.set(1).wait() - raw = super().get(*args, **kwargs) - return epicsCharArray2String(raw) - - @property - def value(self): - raw = super().value - return epicsCharArray2String(raw) - - -class EpicsStringPassive(EpicsSignal): - """Special helper class to work around a bug in ophyd (caproto backend) - that reads CHAR array strigs as uint16 arrays. - """ - def __init__(self, read_pv, write_pv=None, *, put_complete=False, - string=False, limits=False, name=None, **kwargs): - super().__init__(read_pv, write_pv, put_complete, string, limits, name, **kwargs) - self._proc = EpicsSignal(read_pv+".PROC", kind=Kind.omitted, put_complete=True) - - def wait_for_connection(self, *args, **kwargs): - super().wait_for_connection(*args, **kwargs) - self._proc.wait_for_connection(*args, **kwargs) - - def get(self, *args, **kwargs): - self._proc.set(1).wait() - raw = super().get(*args, **kwargs) - return epicsCharArray2String(raw) - - @property - def value(self): - raw = super().value - return epicsCharArray2String(raw) - - -class EpicsString(EpicsSignal): - """Special helper class to work around a bug in ophyd (caproto backend) - that reads CHAR array strigs as uint16 arrays. - """ - def get(self, *args, **kwargs): - raw = super().get(*args, **kwargs) - return epicsCharArray2String(raw) - - @property - def value(self): - raw = super().value - return epicsCharArray2String(raw) - - -class EpicsStringRO(EpicsSignalRO): - """Special helper class to work around a bug in ophyd (caproto backend) - that reads CHAR array strigs as uint16 arrays. - """ - def get(self, *args, **kwargs): - raw = super().get(*args, **kwargs) - return epicsCharArray2String(raw) - - @property - def value(self): - raw = super().value - return epicsCharArray2String(raw) - - -class aa1Controller(Device): - """Ophyd proxy class for the Aerotech Automation 1's core controller functionality""" - controllername = Component(EpicsSignalRO, "NAME", kind=Kind.config) - serialnumber = Component(EpicsSignalRO, "SN", kind=Kind.config) - apiversion = Component(EpicsSignalRO, "API_VERSION", kind=Kind.config) - axiscount = Component(EpicsSignalRO, "AXISCOUNT", kind=Kind.config) - taskcount = Component(EpicsSignalRO, "TASKCOUNT", kind=Kind.config) - fastpoll = Component(EpicsSignalRO, "POLLTIME", auto_monitor=True, kind=Kind.hinted) - slowpoll = Component(EpicsSignalRO, "DRVPOLLTIME", auto_monitor=True, kind=Kind.hinted) - def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - -class aa1Tasks(Device): - """ Task management API - - The place to manage tasks and AeroScript user files on the controller. - You can read/write/compile/execute AeroScript files and also retrieve - saved data files from the controller. - - Execute does not require to store the script in a file, it will compile - it and run it directly on a certain thread. But there's no way to - retrieve the source code. - """ - _failure = Component(EpicsSignalRO, "FAILURE", kind=Kind.hinted) - errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.hinted) - warnStatus = Component(EpicsSignalRO, "WARNW", auto_monitor=True, kind=Kind.hinted) - taskIndex = Component(EpicsSignal, "TASKIDX", kind=Kind.config, put_complete=True) - switch = Component(EpicsSignal, "SWITCH", kind=Kind.config, put_complete=True) - _execute = Component(EpicsSignal, "EXECUTE", string=True, kind=Kind.config, put_complete=True) - _executeMode = Component(EpicsSignal, "EXECUTE-MODE", kind=Kind.config, put_complete=True) - _executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", string=True, auto_monitor=True) - - fileName = Component(EpicsSignal, "FILENAME", string=True, kind=Kind.omitted, put_complete=True) - _fileRead = Component(EpicsPassiveRO, "FILEREAD", string=True, kind=Kind.omitted) - _fileWrite = Component(EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True) - - def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): - """ __init__ MUST have a full argument list""" - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - self._currentTaskMonitor = None - self._textToExecute = None - self._isConfigured = False - - def readFile(self, filename: str) -> str: - """ Read a file from the controller """ - # Have to use CHAR array due to EPICS LSI bug... - self.fileName.set(filename).wait() - filebytes = self._fileRead.get() - # C-strings terminate with trailing zero - if filebytes[-1]==0: - filebytes = filebytes[:-1] - filetext = filebytes - return filetext - - def writeFile(self, filename: str, filetext: str) -> None: - """ Write a file to the controller """ - self.fileName.set(filename).wait() - self._fileWrite.set(filetext).wait() - - def runScript(self, filename: str, taskIndex: int==2, filetext=None, settle_time=0.5) -> None: - """ Run a script file that either exists, or is newly created and compiled""" - - self.configure({'text': filetext, 'filename': filename, 'taskIndex': taskIndex}) - self.trigger().wait() - - def execute(self, text: str, taskIndex: int=3, mode: str=0, settle_time=0.5): - """ Run a short text command on the Automation1 controller""" - - print(f"Executing program on task: {taskIndex}") - self.configure({'text': text, 'taskIndex': taskIndex, 'mode': mode}) - self.trigger().wait() - - if mode in [0, "None", None]: - return None - else: - raw = self._executeReply.get() - return epicsCharArray2String(raw) - - def configure(self, d: dict={}) -> tuple: - """ Configuration interface for flying + This method tests the more abstract bluesky compatibility of the + aerotech ophyd devices. In particular it checks the existence of the + required schema functions (that are not called in other tests). """ - # Unrolling the configuration dict - text = str(d['text']) if 'text' in d else None - filename = str(d['filename']) if 'filename' in d else None - taskIndex = int(d['taskIndex']) if 'taskIndex' in d else 4 - settle_time = float(d['settle_time']) if 'settle_time' in d else None - mode = d['mode'] if 'mode' in d else None - - # Validation - if taskIndex < 1 or taskIndex > 31: - raise RuntimeError(f"Invalid task index: {taskIndex}") - if (text is None) and (filename is None): - raise RuntimeError("Task execution requires either AeroScript text or filename") + # Throws if IOC is not available or PVs are incorrect + def classTestSequence(DeviceClass, prefix, config=None, setable=False, set_value=None, flyable=False): + dut = DeviceClass(prefix, name="dut") + dut.wait_for_connection() - # Common operations - old = self.read_configuration() - self.taskIndex.set(taskIndex).wait() - self._textToExecute = None - #self._currentTaskMonitor = aa1TaskState() + # Readable device (required) + dut.describe() # Describes read() + dut.read() # Read the device active values + dut.trigger() + dut.describe_configuration() # Describes read_configuration() + dut.read_configuration() # Reads device configuration + if config is not None: + try: + ret = dut.configure(config) + self.assertTrue(type(ret)==tuple, f"On device class {DeviceClass}: configure is expected to return a tuple") + self.assertTrue(isinstance(ret[0], OrderedDict), f"On device class {DeviceClass}: old configuration is expected to be an OrderedDict") + self.assertTrue(isinstance(ret[1], (dict, OrderedDict)), f"On device class {DeviceClass}: new configuration is expected to be dict or OrderedDict") + except Exception: + pass + # Readable device (optional) + dut.stage() + dut.unstage() + # Setable device + if setable and set_value is not None: + try: + st = dut.set(set_value) + #dut.stop() + dut.check_value(set_value) + dut.position + except Exception as ex: + warnings.warn(f"Exception occured while testing setable interface: {ex}") + if flyable: + try: + dut.configure(config) + st = dut.kickoff() + st = dut.complete() + ret = dut.describe_collect() + ret = dut.collect() + except Exception: + pass + + classDBase = [ + (aa1Controller, f"{AA1_IOC_NAME}:CTRL:"), + (aa1Tasks, f"{AA1_IOC_NAME}:TASK:"), + (aa1DataAcquisition, f"{AA1_IOC_NAME}:DAQ:"), + (aa1GlobalVariables, f"{AA1_IOC_NAME}:VAR:"), + (aa1GlobalVariableBindings, f"{AA1_IOC_NAME}:VAR:"), + + (EpicsMotor, f"{AA1_IOC_NAME}:{AA1_AXIS_NAME}"), + (aa1AxisIo, f"{AA1_IOC_NAME}:{AA1_AXIS_NAME}:IO:"), + (aa1AxisDriveDataCollection, f"{AA1_IOC_NAME}:{AA1_AXIS_NAME}:DDC:"), + (aa1AxisPsoDistance, f"{AA1_IOC_NAME}:{AA1_AXIS_NAME}:PSO:"), + (aa1AxisPsoWindow, f"{AA1_IOC_NAME}:{AA1_AXIS_NAME}:PSO:"), + ] - # Choose the right execution mode - if (filename is None) and (text not in [None, ""]): - # Direct command execution - print("Preparing for direct command execution") - if mode is not None: - self._executeMode.set(mode).wait() - self._textToExecute = text - elif (filename is not None) and (text in [None, ""]): - # Execute existing file - self.fileName.set(filename).wait() - self.switch.set("Load").wait() - elif (filename is not None) and (text not in [None, ""]): - print("Preparing to execute via intermediary file") - # Execute text via intermediate file - self.taskIndex.set(taskIndex).wait() - self.fileName.set(filename).wait() - self._fileWrite.set(text).wait() - self.switch.set("Load").wait() - self._textToExecute = None - else: - raise RuntimeError("Unsupported filename-text combo") - self._isConfigured = True - - new = self.read_configuration() - return (old, new) - - ########################################################################## - # Bluesky stepper interface - def stage(self) -> None: - """ Default staging """ - print("Staging") - super().stage() - def unstage(self) -> None: - """ Default unstaging """ - print("Unstaging") - super().unstage() + print("BSKY: Testing basic Ophyd device class interfaces") + for DeviceClass, prefix in classDBase: + classTestSequence(DeviceClass, prefix) + print("BSKY: Done testing CTRL module!") - def trigger(self, settle_time=0.2) -> Status: - """ Execute the script on the configured task""" - print("Triggering") - if self._isConfigured: - if self._textToExecute is not None: - status = self._execute.set(self._textToExecute, settle_time=settle_time) - else: - status = self.switch.set("Run", settle_time=settle_time) - else: - status = Status() - status.set_finished() - return status - def stop(self): - """ Stop the currently selected task """ - print(type(self.switch)) - self.switch.set("Stop").wait() - - ########################################################################## - # Flyer interface - def kickoff(self) -> DeviceStatus: - """ Execute the script on the configured task""" - print("Kickoff") - if self._isConfigured: - if self._textToExecute is not None: - status = self._execute.set(self._textToExecute, settle_time=0.5) - else: - status = self.switch.set("Run", settle_time=0.1) - else: - status = DeviceStatus(self) - status.set_finished() - return status - - def complete(self) -> DeviceStatus: - """ Execute the script on the configured task""" - print("Complete") - #return self._currentTaskMonitor.complete() - status = DeviceStatus(self) - status.set_finished() - return status - def describe_collect(self) -> OrderedDict: - dd = OrderedDict() - dd['success'] = {'source': "internal", 'dtype': 'integer', 'shape': [], 'units': '', 'lower_ctrl_limit': 0, 'upper_ctrl_limit': 0} - return {self.name: dd} + def test_01_ControllerProxy(self): + # Throws if IOC is not available or PVs are incorrect + dev = aa1Controller(AA1_IOC_NAME+":CTRL:", name="aa1") + dev.wait_for_connection() - def collect(self) -> OrderedDict: - print("Collecting") - ret = OrderedDict() - ret["timestamps"] = {"success": time.time()} - ret["data"] = {"success": 1} - yield ret + print("CTRL: Testing basic controller readback") + # Check if controller info was loaded correctly + self.assertTrue(len(dev.controllername.get())>0, "Controller name seems invalid") + self.assertTrue(len(dev.serialnumber.get())>0, "Controller serial number seems invalid") + self.assertTrue(dev.axiscount.get()>0, "Maximum controller axis count seems invalid") + self.assertTrue(dev.taskcount.get()>0, "Maximum controller task count seems invalid") + print("CTRL: Done testing CTRL module!") + + + def test_02_TaskProxy(self): + # Throws if IOC is not available or PVs are incorrect + dev = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") + dev.wait_for_connection() + + print("\nTASK: Testing the 'execute' command (direct string execution)") + # Attempt to run a short test script using the Execute interface (real) + text = "$rreturn[0]=3.141592" + rbv = dev.execute(text, mode="Double", taskIndex=3) + + self.assertTrue(dev._executeMode.get() in ["Double", 3], f"Expecting double return type for execution, but got: {dev._executeMode.get()}") + self.assertTrue(dev.taskIndex.get()==3, "Execution must happen on task 3") + self.assertTrue(rbv=="Result (double): 3.141592", f"Unexpected reply from execution: {rbv}") + # Attempt to run a short test script using the Execute interface (integer) + text = "$ireturn[0]=42" + rbv = dev.execute(text, mode="Int", taskIndex=4) + self.assertTrue(dev.taskIndex.get()==4, "Execution must happen on task 1") + self.assertTrue(rbv=="Result (int): 42", f"Unexpected reply from execution: {rbv}") + + print("TASK: Testing the 'file' interface") + # Write and read back a file + filename = "unittesting.ascript" + text = "program\nvar $positions[2] as real\nend" + if len(text)>40: + print(f"\tWARN: Program text is too long for EPICS string: {len(text)}") + + dev.writeFile(filename, text) + contents = dev.readFile(filename) + self.assertTrue(contents==text, "File contents changed on readback") + + print("TASK: Testing the 'run' command (run an existing file)") + # Send another empty program + filename = "unittesting2.ascript" + text = "program\nvar $positions[2] as real\nend" + dev.runScript(filename, taskIndex=3, filetext=text) + + print("TASK: Testing bluesky interface with free text)") + dev.configure({'text': "$rreturn[0]=137.036", 'taskIndex': 4, 'mode': "Double"}) + dev.kickoff().wait() + dev.complete().wait() + rbv = dev._executeReply.get() + self.assertTrue(dev._executeMode.get() in ["Double", 3], f"Expecting double return type for execution, but got: {dev._executeMode.get()}") + self.assertTrue(dev.taskIndex.get()==4, "Execution must happen on task 4") + self.assertTrue(rbv=="Result (double): 137.036000", f"Unexpected reply from execution: {rbv}") + print("TASK: Done testing TASK module!") -class aa1TaskState(Device): - """ Task state monitoring API - - This is the task state monitoring interface for Automation1 tasks. It - does not launch execution, but can wait for the execution to complete. - """ - index = Component(EpicsSignalRO, "INDEX", kind=Kind.config) - status = Component(EpicsSignalRO, "STATUS", auto_monitor=True, kind=Kind.hinted) - errorCode = Component(EpicsSignalRO, "ERROR", auto_monitor=True, kind=Kind.hinted) - warnCode = Component(EpicsSignalRO, "WARNING", auto_monitor=True, kind=Kind.hinted) - - def complete(self) -> StatusBase: - """ Bluesky flyer interface""" - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - def notRunning(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_== 0) else (value not in ["Running", 4]) - timestamp_ = timestamp - return result + def test_03_DataCollectionProxy(self): + """ This tests the controller data collection - # Subscribe and wait for update - status = SubscriptionStatus(self.status, notRunning, settle_time=0.5) - return status - - def kickoff(self) -> DeviceStatus: - status = DeviceStatus(self) - status.set_finished() - return status - - def describe_collect(self) -> OrderedDict: - dd = OrderedDict() - dd['success'] = {'source': "internal", 'dtype': 'integer', 'shape': [], 'units': '', 'lower_ctrl_limit': 0, 'upper_ctrl_limit': 0} - return dd - - def collect(self) -> OrderedDict: - ret = OrderedDict() - ret["timestamps"] = {"success": time.time()} - ret["data"] = {"success": 1} - yield ret - - -class aa1DataAcquisition(Device): - """ Controller Data Acquisition - DONT USE at Tomcat - - This class implements the controller data collection feature of the - Automation1 controller. This feature logs various inputs at a - **fixed frequency** from 1 kHz up to 200 kHz. - Usage: - 1. Start a new configuration with "startConfig" - 2. Add your signals with "addXxxSignal" - 3. Start your data collection - 4. Read back the recorded data with "readback" - """ - # Status monitoring - status = Component(EpicsSignalRO, "RUNNING", auto_monitor=True, kind=Kind.hinted) - points_max = Component(EpicsSignal, "MAXPOINTS", kind=Kind.config, put_complete=True) - signal_num = Component(EpicsSignalRO, "NITEMS", kind=Kind.config) - - points_total = Component(EpicsSignalRO, "NTOTAL", auto_monitor=True, kind=Kind.hinted) - points_collected = Component(EpicsSignalRO, "NCOLLECTED", auto_monitor=True, kind=Kind.hinted) - points_retrieved= Component(EpicsSignalRO, "NRETRIEVED", auto_monitor=True, kind=Kind.hinted) - overflow = Component(EpicsSignalRO, "OVERFLOW", auto_monitor=True, kind=Kind.hinted) - runmode = Component(EpicsSignalRO, "MODE_RBV", auto_monitor=True, kind=Kind.hinted) - # DAQ setup - numpoints = Component(EpicsSignal, "NPOINTS", kind=Kind.config, put_complete=True) - frequency = Component(EpicsSignal, "FREQUENCY", kind=Kind.config, put_complete=True) - _configure = Component(EpicsSignal, "CONFIGURE", kind=Kind.omitted, put_complete=True) - - def startConfig(self, npoints: int, frequency: DataCollectionFrequency): - self.numpoints.set(npoints).wait() - self.frequency.set(frequency).wait() - self._configure.set("START").wait() - - def clearConfig(self): - self._configure.set("CLEAR").wait() - - srcTask = Component(EpicsSignal, "SRC_TASK", kind=Kind.config, put_complete=True) - srcAxis = Component(EpicsSignal, "SRC_AXIS", kind=Kind.config, put_complete=True) - srcCode = Component(EpicsSignal, "SRC_CODE", kind=Kind.config, put_complete=True) - _srcAdd = Component(EpicsSignal, "SRC_ADD", kind=Kind.omitted, put_complete=True) - - def addAxisSignal(self, axis: int, code: int) -> None: - """ Add a new axis-specific data signal to the DAQ configuration. The - most common signals are PositionFeedback and PositionError. + Test for testing the controller data collection that occurs at a + fixed frequency. This interface is not so important if there's + hardware synchronization. """ - self.srcAxis.set(axis).wait() - self.srcCode.set(code).wait() - self._srcAdd.set("AXIS").wait() - - def addTaskSignal(self, task: int, code: int) -> None: - """ Add a new task-specific data signal to the DAQ configuration""" - self.srcTask.set(task).wait() - self.srcCode.set(code).wait() - self._srcAdd.set("TASK").wait() - - def addSystemSignal(self, code: int) -> None: - """ Add a new system data signal to the DAQ configuration. The most - common signal is SampleCollectionTime. """ - self.srcCode.set(code).wait() - self._srcAdd.set("SYSTEM").wait() - - # Starting / stopping the DAQ - _mode = Component(EpicsSignal, "MODE", kind=Kind.config, put_complete=True) - _switch = Component(EpicsSignal, "SET", kind=Kind.omitted, put_complete=True) - - def start(self, mode=DataCollectionMode.Snapshot) -> None: - """ Start a new data collection """ - self._mode.set(mode).wait() - self._switch.set("START").wait() + return + # Throws if IOC is not available or PVs are incorrect + dev = aa1DataAcquisition(AA1_IOC_NAME+":DAQ:", name="daq") + dev.wait_for_connection() - def stop(self) -> None: - """ Stop a running data collection """ - self._switch.set("STOP").wait() + print("\nDAQ: Testing the controller configuration") + self.assertTrue(dev.status.get()==0, "DAQ must be idle to start the tests") + self.assertTrue(dev.points_max.get()>30000, "The test requires at least 30000 point buffer") - def run(self, mode=DataCollectionMode.Snapshot) -> None: - """ Start a new data collection """ - self._mode.set(mode).wait() - self._switch.set("START").wait() - # Wait for finishing acquisition - # Note: this is very bad blocking sleep - while self.status.value!=0: - sleep(0.1) - sleep(0.1) + ###################################################################### + # Test configuration 1 + print("DAQ: Running a short test run") + dev.startConfig(3000, "1kHz") + dev.addSystemSignal(SystemDataSignal.DataCollectionSampleTime) + dev.addAxisSignal(0, AxisDataSignal.PositionFeedback) + self.assertTrue(dev.signal_num.get()==2, "Expected 2 data sources") + # Run the data acquisition (run blocks and returns array) + _ = dev.run() + # Check post-run parameter values + self.assertTrue(dev.points_total.get()==3000, "Expected to have 3k points") + self.assertTrue(dev.points_collected.get()==3000, "Expected to record 3k points, did {dev.points_collected.value}") + + print("DAQ: Checking the returned array") + arr = dev.dataReadBack() + self.assertTrue(arr.size==6000, f"Unexpected data array size {arr.size}") + self.assertTrue(arr.shape==(3000,2), f"Unexpected data array shape {arr.size}") + + ###################################################################### + # Test manual configuration 2 + print("DAQ: Running a short test run") + dev.startConfig(2800, "1kHz") + dev.addSystemSignal(SystemDataSignal.DataCollectionSampleTime) + dev.addAxisSignal(0, AxisDataSignal.PositionFeedback) + dev.addAxisSignal(0, AxisDataSignal.VelocityFeedback) + self.assertTrue(dev.signal_num.get()==3, "Expected 3 data sources") + # Run the data acquisition (run blocks and returns array) + _ = dev.run() + # Check post-run parameter values + self.assertTrue(dev.points_total.get()==2800, f"Expected to have 2.8k points, got {dev.points_total.get()}") + self.assertTrue(dev.points_collected.get()==2800, f"Expected to record 2.8k points, got {dev.points_collected.get()}") + + print("DAQ: Checking the returned array") + arr = dev.dataReadBack() + self.assertTrue(arr.size==8400, f"Unexpected data array size {arr.size}") + self.assertTrue(arr.shape==(2800,3), f"Unexpected data array shape {arr.size}") + print("DAQ: Done testing DAQ module!") + + + def test_04_GlobalVariables1(self): + """Test the basic read/write global variable interface + """ + # Throws if IOC is not available or PVs are incorrect + dev = aa1GlobalVariables(AA1_IOC_NAME+":VAR:", name="gvar") + polled = aa1GlobalVariableBindings(AA1_IOC_NAME+":VAR:", name="pvar") + dev.wait_for_connection() + polled.wait_for_connection() + + print("\nVAR: Checking available memory") + self.assertTrue(dev.num_int.get()>=32, "At least 32 integer variables are needed for this test") + self.assertTrue(dev.num_real.get()>=32, "At least 32 real variables are needed for this test") + self.assertTrue(dev.num_string.get()>=32, "At least 32 string[256] variables are needed for this test") + + print("VAR: Setting and checking a few variables") + # Use random variables for subsequent runs + val_f = 100*np.random.random() + dev.writeFloat(11, val_f) + rb_f = dev.readFloat(11) + self.assertEqual(val_f, rb_f, f"Floating point readback value changed, read {rb_f} instead of {val_f}") + rb_f = dev.readFloat(10, 4) + self.assertEqual(val_f, rb_f[1], f"Floating point array readback value changed, read {rb_f[1]} instead of {val_f}") + + val_i = 1+int(100*np.random.random()) + dev.writeInt(13, val_i) + rb_i = dev.readInt(13) + self.assertEqual(val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}") + rb_i = dev.readInt(10, 4) + self.assertEqual(val_i, rb_i[3], f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + + val_i =1+int(100*np.random.random()) + dev.writeInt(13, val_i) + rb_i = dev.readInt(13) + self.assertEqual(val_i, rb_i, f"Integer readback value changed, read {rb_i} instead of {val_i}") + rb_i = dev.readInt(10, 4) + self.assertEqual(val_i, rb_i[3], f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + + val_s = np.random.choice(['f', 'o', 'o', 'b', 'a', 'r'], 6) + val_s = ''.join(val_s) + dev.writeString(7, val_s) + rb_s = dev.readString(7) + self.assertEqual(val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}") + print("VAR: Done testing VAR module basic functionality!") + + + def test_05_GlobalVariables2(self): + """Test the direct global variable bindings together with the task and + standard global variable interfaces. + """ + + # Throws if IOC is not available or PVs are incorrect + var = aa1GlobalVariables(AA1_IOC_NAME+":VAR:", name="gvar") + var.wait_for_connection() + dev = aa1GlobalVariableBindings(AA1_IOC_NAME+":VAR:", name="var") + dev.wait_for_connection() + tsk = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") + tsk.wait_for_connection() + + print("\nVAR: Checking the direct global variable bindings") + # Use random variables for subsequent runs + val_f = 100*np.random.random() + dev.float16.set(val_f, settle_time=0.91).wait() # settle time seems to be ignored by ophyd + + rb_f = dev.float16.value + self.assertEqual(val_f, dev.float16.value, f"Floating point readback value changed, read {rb_f}, expected {val_f}") + #rb_f = dev.readFloat(10, 4) + #self.assertEqual(val_f, rb_f[1], f"Floating point array readback value changed, read {rb_f[1]} instead of {val_f}") + + val_i = 1+int(100*np.random.random()) + dev.int8.set(val_i, settle_time=0.91).wait() + rb_i = dev.int8.value + self.assertEqual(val_i, dev.int8.value, f"Integer readback value changed, read {rb_i}, expected {val_i}") + #rb_i = dev.readInt(10, 4) + #self.assertEqual(val_i, rb_i[3], f"Integer array readback value changed, read {rb_i[3]} instead of {val_i}") + + val_s = np.random.choice(['f', 'o', 'o', 'b', 'a', 'r'], 6) + val_s = ''.join(val_s) + dev.str4.set(val_s, settle_time=0.91).wait() + rb_s = dev.str4.value + self.assertEqual(val_s, rb_s, f"String readback value changed, read {rb_s} instead of {val_s}") + print("VAR: Done testing VAR module direct bindings!") + + + def test_06_MotorRecord(self): + """ Tests the basic move functionality of the MotorRecord + """ + + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + print("\nMR: Checking the standard EPICS motor record") + + # Four test with absolute moves + dist = 30 + 50.0 * np.random.random() + target = mot.position + dist + print(f"Absolute move to {target}, distance: {dist}") + mot.move(target, wait=True) + self.assertTrue(np.abs(target-mot.position)<0.1, f"Final position {mot.position} is too far from target {target} (1)") + + dist = 30 + 50.0 * np.random.random() + target = mot.position - dist + print(f"Absolute move to {target}, distance: {dist}") + mot.move(target, wait=True) + self.assertTrue(np.abs(target-mot.position)<0.1, f"Final position {mot.position} is too far from target {target} (2)") + + dist = 30 + 50.0 * np.random.random() + target = mot.position+dist + print(f"Absolute move to {target}, distance: {dist}") + mot.move(target, wait=True) + self.assertTrue(np.abs(target-mot.position)<0.1, f"Final position {mot.position} is too far from target {target} (3)") + + dist = 30 + 50.0 * np.random.random() + target = mot.position - dist + print(f"Absolute move to {target}, distance: {dist}") + mot.move(target, wait=True) + self.assertTrue(np.abs(target-mot.position)<0.1, f"Final position {mot.position} is too far from target {target} (4)") + print("MR: Done testing MotorRecord!") + + + def test_07_AxisIo(self): + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisIo(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":IO:", name="io") + dev.wait_for_connection() + dev.polllvl.set(1, settle_time=0.5).wait() + print("\nAX:IO: Checking axis IO") + + # Writing random values to analog/digital outputs + for ii in range(5): + val_f = np.random.random() + dev.setAnalog(0, val_f) + self.assertAlmostEqual(dev.ao.get(), val_f, 2, "Unexpected readback on analog output") + print("Digital") + for ii in range(5): + val_b = round(np.random.random()) + dev.setDigital(0, val_b) + self.assertEqual(dev.do.get(), val_b, "Unexpected readback on digital output") + print("IO: Done testing IO module!") + + + def test_08_AxisDdc(self): + """ Drive data collection + + This is the preliminary test case dor drive data collection using + internal PSO based triggers. It tests basic functionality, like + configuration, state monitoring and readback. + """ + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + event = EpicsSignal(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:EVENT:SINGLE", put_complete=True) + dev.wait_for_connection() + event.wait_for_connection() + print("\nAX:DDC: Checking axis DDC") + + # Stop any running acquisition + dev.unstage(settle_time=0.1) + + print("AX:DDC: Running a partial acquisition (1/3)") + # Configure the DDC for 42 points and start waiting for triggers + dev.configure({'npoints': 42, 'trigger': DriveDataCaptureTrigger.PsoEvent}) + dev.kickoff().wait() + # Fire 12 triggers and stop the acquisition early + for ii in range(12): + event.set(1, settle_time=0.02).wait() + # Stop the data acquisition early + dev.unstage() - # Data readback - data = self.data_rb.get() - rows = self.data_rows.get() - cols = self.data_cols.get() - if len(data)==0 or rows==0 or cols==0: + # Wait for polling to catch up + sleep(1) + + # Do the readback and check final state + rbv = yield from dev.collect() + self.assertEqual(dev.nsamples_rbv.value, 12, f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}") + self.assertEqual(len(rbv["buffer0"]), 12, f"Expected to read back 12 points by now, got {len(rbv['buffer0'])}") + self.assertEqual(len(rbv["buffer1"]), 12, f"Expected to read back 12 points by now, got {len(rbv['buffer1'])}") + + + print("AX:DDC: Running an overruning acquisition (2/3)") + # Configure the DDC for 42 points and start waiting for triggers + dev.configure(16, trigger=DriveDataCaptureTrigger.PsoEvent) + dev.stage() + num = dev.nsamples_rbv.get() + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") + + # Fire 20 triggers and stop the acquisition early + for ii in range(20): + event.set(1, settle_time=0.02).wait() + dev.unstage() + + # Wait for polling to catch up + sleep(1) + + # Do the readback and check final state + rbv = dev.collect() + self.assertEqual(dev.nsamples_rbv.value, 16, f"Expected to collect 16 points by now, got {dev.nsamples_rbv.value}") + self.assertEqual(len(rbv["buffer0"]), 16, f"Expected to read back 16 points by now, got {len(rbv['buffer0'])}") + self.assertEqual(len(rbv["buffer1"]), 16, f"Expected to read back 16 points by now, got {len(rbv['buffer1'])}") + + + print("AX:DDC: Trying to record some noise on the analog input (3/3)") + dev.configure(36, trigger=DriveDataCaptureTrigger.PsoEvent, source1=DriveDataCaptureInput.AnalogInput0) + dev.stage() + num = dev.nsamples_rbv.get() + self.assertEqual(num, 0, f"Stage should reset the number of DDC points but got: {num}") + # Fire 36 triggers + for ii in range(36): + event.set(1, settle_time=0.02).wait() + dev.unstage() + + # Wait for polling to catch up + sleep(1) + + # Do the readback and check final state + rbv = dev.collect() + self.assertEqual(dev.nsamples_rbv.value, 36, f"Expected to collect 12 points by now, got {dev.nsamples_rbv.value}") + self.assertEqual(len(rbv["buffer0"]), 36, f"Expected to read back 36 points by now, got {len(rbv['buffer0'])}") + self.assertEqual(len(rbv["buffer1"]), 36, f"Expected to read back 36 points by now, got {len(rbv['buffer1'])}") + self.assertTrue(np.std(rbv["buffer1"])>0.000001, f"Expected some noise on analog input, but std is: {np.std(rbv['buffer1'])}") + print("AX:DDC: Done testing DDC module!") + + + def test_09_AxisPsoDistance1(self): + """ Test basic PSO distance mode functionality + + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisPsoDistance(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + dev.wait_for_connection() + ddc.wait_for_connection() + print("\nAX:PSO: Checking axis PSO in distance mode (basic)") + if dev.output.value: + dev.toggle() + sleep(1) + + print("AX:PSO: Directly checking basic 'toggled' event generation (1/3)") + # Configure steps and move to middle of range + dev.configure({'distance': 5, 'wmode': "toggle"}) + mot.move(mot.position + 2.5) + dev.stage() + # Start moving in steps, while checking output + for ii in range(10): + last_pso = dev.output.value + mot.move(mot.position + 5) + sleep(1) + self.assertTrue(dev.output.value != last_pso, f"Expected to toggle the PSO output at step {ii}, got {dev.output.value}") + dev.unstage() + ddc.unstage() + + print("AX:PSO: Checking basic 'toggled' event generation with DDC (2/3)") + # Configure steps and move to middle of range + dev.configure({'distance': 2.0, 'wmode': "toggle"}) + dev.stage() + + # Configure DDC + npoints = 720 / 2.0 + ddc.configure({'npoints': npoints+100, 'trigger': DriveDataCaptureTrigger.PsoOutput}) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertTrue(dev.output.value==0, "Expected to start from LOW state") + + # Start moving and wait for polling + mot.move(mot.position + 720) + sleep(0.5) + + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-720/4)<=1, f"Expected to record {720/4} points but got {npoints_rbv}") + dev.unstage() + ddc.unstage() + + + print("AX:PSO: Checking basic 'pulsed' event generation with DDC (3/5)") + # Configure steps and move to middle of range + dev.configure({'distance': 1.8, 'wmode': "pulsed", 'n_pulse': 3}) + dev.kickoff().wait() + + # Configure DDC + npoints = 3 * 720 / 1.8 + ddc.configure({'npoints': npoints+100, 'trigger': DriveDataCaptureTrigger.PsoOutput}) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertTrue(dev.output.value==0, "Expected to start from LOW state") + + # Start moving and wait for polling + mot.move(mot.position + 720.9) + sleep(0.5) + + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-3*720/1.8)<=1, f"Expected to record {3*720/1.8} points but got {npoints_rbv}") + print("AX:PSO: Done testing basic PSO distance functionality!") + dev.unstage() + ddc.unstage() + + + def test_10_PsoIntegration01(self): + """ The first integration test aims to verify PSO functionality under + real life scenarios. It uses the DDC for collecting encoder signals + and compares them to the expected values. + """ + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisPsoDistance(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + dev.wait_for_connection() + ddc.wait_for_connection() + print("\nAX:PSO: Integration test for vector style PSO triggering (advanced)") + + print("AX:PSO: Testing standard 'enable' style PSO triggering with DDC (1/3)") + # This is one line of the Tomcat sequence scan! + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + vec_dist = [acc_dist, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18, 0.01, 18] + dev.configure({'distance': vec_dist, 'wmode': "toggle"}) + if dev.output.value: + dev.toggle() + dev.stage() + + # Configure DDC + npoints_exp = len(vec_dist)/2 + ddc.configure({'npoints': npoints_exp+100, 'trigger': DriveDataCaptureTrigger.PsoOutput}) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertTrue(dev.output.value==0, "Expected to start from LOW state") + + # Start moving and wait for polling + mot.move(mot.position + np.sum(vec_dist) + 10) + sleep(0.9) + + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-npoints_exp)<1, f"Expected to record {npoints_exp} points but got {npoints_rbv}") + self.assertTrue(dev.output.value==0, "Expected to finish in LOW state") + self.assertTrue(dev.dstArrayDepleted.value, "Expected to cover all pints in the distance array") + dev.unstage() + ddc.unstage() + + print("AX:PSO: Checking 'individual' style trigger signal with DDC (2/5)") + # This is practically golden ratio tomography! + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + vec_dist = [acc_dist] + vec_dist.extend([1.618]* 100) + dev.configure({'distance': vec_dist, 'wmode': "pulsed"}) + dev.stage() + + # Configure DDC + npoints_exp = len(vec_dist) + ddc.configure({'npoints': npoints_exp+100, 'trigger': DriveDataCaptureTrigger.PsoOutput}) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + self.assertTrue(dev.output.value==0, "Expected to start from LOW state") + + # Start moving and wait for polling (move negative direction) + mot.move(mot.position - np.sum(vec_dist) - acc_dist) + sleep(0.9) + + # Evaluate results + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-npoints_exp)<1, f"Expected to record {npoints_exp} points but got {npoints_rbv}") + self.assertTrue(dev.output.value==0, "Expected to finish in LOW state") + self.assertTrue(dev.dstArrayDepleted.value, "Expected to cover all pints in the distance array") + print("AX:PSO: Done testing PSO module in distance mode!") + dev.unstage() + ddc.unstage() + + + def test_11_AxisPsoWindow1(self): + """ Test basic PSO window mode functionality + + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + return + # Throws if IOC is not available or PVs are incorrect + dev = aa1AxisPsoWindow(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + dev.wait_for_connection() + ddc.wait_for_connection() + print("\nAX:PSO: Checking axis PSO in window mode (basic)") + if dev.output.value: + dev.toggle() + + print("AX:PSO: Directly checking basic window output (1/3)") + # Configure steps and move to middle of range + dev.configure((5, 13), "output") + dev.stage() + sleep(1) + # Move half a step + mot.move(mot.position + 0.1) + dist = 0.1 + + # Start moving in steps, while checking output + for ii in range(42): + mot.move(mot.position + 0.2) + dist += 0.2 + sleep(1) + if 5 < dist < 13: + self.assertTrue(dev.output.value==1, f"Expected HIGH PSO output inside the window at distance {dist}") + else: + self.assertTrue(dev.output.value==0, f"Expected LOW PSO output outside the window at distance {dist}") + + print("AX:PSO: The following tests whould fail as multiple triggers/events are being emmitted when entering/exiting the window!") + dev.unstage() + return + + print("AX:PSO: Checking basic window event generation with DDC (2/3)") + print(" WARN: The encoder output is very noisy, one transition generates several events") + # Move to acceleration position + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + print(f"Acceleration distance: {acc_dist}") + start_position = mot.position + mot.move(start_position - acc_dist) + # Configure PSO in window mode + dev.configure((acc_dist, 30 + acc_dist), "pulsed", "Both", n_pulse=1) + # Configure the data capture (must be performed in start position) + ddc.configure(5*10, trigger=DriveDataCaptureTrigger.PsoOutput) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + # Repeated line scan + for ii in range(10): + dev.stage() + mot.move(start_position + 30 + acc_dist) + dev.unstage() + mot.move(start_position - acc_dist) + sleep(0.5) + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-50)<1, f"Expected to record 10 points but got {npoints_rbv}") + + + print("AX:PSO: Checking basic 'toggled' event generation with DDC (3/3)") + print("Test removed as there's some heavy jitter on the PSO output flag") + # Move to acceleration position + acc_dist = 0.5 * mot.velocity.value * mot.acceleration.value + print(f"Acceleration distance: {acc_dist}") + mot.move(mot.position - acc_dist) + # Configure PSO in window mode + dev.configure((acc_dist, 30 + acc_dist + 1), "output") + dev.stage() + # Configure the data capture + ddc.configure(10, trigger=DriveDataCaptureTrigger.PsoOutput) + ddc.stage() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 0, f"DDC.stage should reset the number of DDC points but got: {npoints_rbv}") + # Snake scan + for ii in range(10): + if ii % 2 == 0: + mot.move(mot.position + 30+ 2*acc_dist + 0.1) + else: + mot.move(mot.position - 30 - 2*acc_dist - 0.1) + sleep(0.5) + npoints_rbv = ddc.nsamples_rbv.value + self.assertTrue(np.abs(npoints_rbv-10)<1, f"Expected to record 10 points but got {npoints_rbv}") + + + + def test_12_TomcatSequenceScan(self): + """ Test the zig-zag sequence scan from Tomcat (via BEC) + + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. + """ + # Throws if IOC is not available or PVs are incorrect + pso = aa1AxisPsoDistance(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":PSO:", name="pso1") + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") + mot.wait_for_connection() + pso.wait_for_connection() + ddc.wait_for_connection() + t_start = time() + print("\nTOMCAT Sequeence scan (via Ophyd)") + + ScanStart = 42.0 + ScanRange = 180 + NumRepeat = 10 + ScanType = "PosNeg" + + # Dynamic properties + VelScan = 90 + AccScan = 500 + SafeDist = 10.0 + + ###################################################################### + print("\tConfigure") + # Scan range + AccDist = 0.5 * VelScan * VelScan / AccScan + SafeDist + if ScanType in ["PosNeg", "Pos"]: + PosStart = ScanStart - AccDist + PosEnd = ScanStart + ScanRange + AccDist + elif ScanType in ["NegPos", "Neg"]: + PosStart = ScanStart + AccDist + PosEnd = ScanStart - ScanRange - AccDist + else: + raise RuntimeError(f"Unexpected ScanType: {ScanType}") + + # Motor setup + mot.velocity.set(VelScan).wait() + mot.acceleration.set(VelScan/AccScan).wait() + mot.move(PosStart) + self.assertTrue(np.abs(mot.position-PosStart)<0.1, f"Expected to move to scan start position {PosStart}, found motor at {mot.position}") + + # PSO setup + print(f"Configuring PSO to: {[AccDist, ScanRange]}") + pso.configure({'distance': [AccDist, ScanRange], "wmode": "toggle"}) + + # DDC setup (Tomcat runs in ENABLE mode, so only one posedge) + ddc.configure({'npoints': NumRepeat}) + + print("\tStage") + mot.stage() + pso.stage() + ddc.stage() + + for ii in range(NumRepeat): + # No option to reset the index counter... + pso.dstArrayRearm.set(1).wait() + + if ScanType in ["Pos", "Neg"]: + mot.move(PosEnd) + self.assertTrue(np.abs(mot.position-PosEnd)<1.0, f"Expected to reach {PosEnd}, motor is at {mot.position} on iteration {ii}") + mot.move(PosStart) + self.assertTrue(np.abs(mot.position-PosStart)<1.0, f"Expected to rewind to {PosStart}, motor is at {mot.position} on iteration {ii}") + + if ScanType in ["PosNeg", "NegPos"]: + if ii % 2 == 0: + mot.move(PosEnd) + self.assertTrue(np.abs(mot.position-PosEnd)<1.0, f"Expected to reach {PosEnd}, motor is at {mot.position} on iteration {ii}") + else: + mot.move(PosStart) + self.assertAlmostEqual(mot.position, PosStart, 1, f"Expected to reach {PosStart}, motor is at {mot.position} on iteration {ii}") + sleep(0.5) - data = self.data_rb.get() - rows = self.data_rows.get() - cols = self.data_cols.get() - print(f"Data shape: {rows} x {cols}") - data = data.reshape([int(rows), -1]) - return data - - # DAQ data readback - data_rb = Component(EpicsSignalPassive, "DATA", kind=Kind.hinted) - data_rows = Component(EpicsSignalRO, "DATA_ROWS", auto_monitor=True, kind=Kind.hinted) - data_cols = Component(EpicsSignalRO, "DATA_COLS", auto_monitor=True, kind=Kind.hinted) - data_stat = Component(EpicsSignalRO, "DATA_AVG", auto_monitor=True, kind=Kind.hinted) - - def dataReadBack(self) -> np.ndarray: - """Retrieves collected data from the controller""" - data = self.data_rb.get() - rows = self.data_rows.get() - cols = self.data_cols.get() - if len(data)==0 or rows==0 or cols==0: - sleep(0.2) - data = self.data_rb.get() - rows = self.data_rows.get() - cols = self.data_cols.get() - print(f"Data shape: {rows} x {cols}") - data = data.reshape([int(rows), -1]) - return data - + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, ii+1, f"Expected to record {ii+1} DDC points, but got: {npoints_rbv} on iteration {ii}") + print("\tUnstage") + mot.unstage() + ddc.unstage() + pso.unstage() -class aa1GlobalVariables(Device): - """ Global variables - - This class provides an interface to directly read/write global variables - on the Automation1 controller. These variables are accesible from script - files and are thus a convenient way to interface with the outside word. - - Read operations take as input the memory address and the size - Write operations work with the memory address and value - """ - # Status monitoring - num_real = Component(EpicsSignalRO, "NUM-REAL_RBV", kind=Kind.config) - num_int = Component(EpicsSignalRO, "NUM-INT_RBV", kind=Kind.config) - num_string = Component(EpicsSignalRO, "NUM-STRING_RBV", kind=Kind.config) - - integer_addr = Component(EpicsSignal, "INT-ADDR", kind=Kind.omitted, put_complete=True) - integer_size = Component(EpicsSignal, "INT-SIZE", kind=Kind.omitted, put_complete=True) - integer = Component(EpicsSignal, "INT", kind=Kind.omitted, put_complete=True) - integer_rb = Component(EpicsPassiveRO, "INT-RBV", kind=Kind.omitted) - integerarr = Component(EpicsSignal, "INTARR", kind=Kind.omitted, put_complete=True) - integerarr_rb = Component(EpicsPassiveRO, "INTARR-RBV", kind=Kind.omitted) - - real_addr = Component(EpicsSignal, "REAL-ADDR", kind=Kind.omitted, put_complete=True) - real_size = Component(EpicsSignal, "REAL-SIZE", kind=Kind.omitted, put_complete=True) - real = Component(EpicsSignal, "REAL", kind=Kind.omitted, put_complete=True) - real_rb = Component(EpicsPassiveRO, "REAL-RBV", kind=Kind.omitted) - realarr = Component(EpicsSignal, "REALARR", kind=Kind.omitted, put_complete=True) - realarr_rb = Component(EpicsPassiveRO, "REALARR-RBV", kind=Kind.omitted) - - string_addr = Component(EpicsSignal, "STRING-ADDR", kind=Kind.omitted, put_complete=True) - string = Component(EpicsSignal, "STRING", string=True, kind=Kind.omitted, put_complete=True) - string_rb = Component(EpicsPassiveRO, "STRING-RBV", string=True, kind=Kind.omitted) - - def readInt(self, address: int, size: int=None) -> int: - """ Read a 64-bit integer global variable """ - if address > self.num_int.get(): - raise RuntimeError("Integer address {address} is out of range") - - if size is None: - self.integer_addr.set(address).wait() - return self.integer_rb.get() - else: - self.integer_addr.set(address).wait() - self.integer_size.set(size).wait() - return self.integerarr_rb.get() - - def writeInt(self, address: int, value) -> None: - """ Write a 64-bit integer global variable """ - if address > self.num_int.get(): - raise RuntimeError("Integer address {address} is out of range") - - if isinstance(value, (int, float)): - self.integer_addr.set(address).wait() - self.integer.set(value).wait() - elif isinstance(value, np.ndarray): - self.integer_addr.set(address).wait() - self.integerarr.set(value).wait() - elif isinstance(value, (list, tuple)): - value = np.array(value, dtype=np.int32) - self.integer_addr.set(address).wait() - self.integerarr.set(value).wait() - else: - raise RuntimeError("Unsupported integer value type: {type(value)}") + t_end = time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") + + print("Evaluate final state") + sleep(3) + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, NumRepeat, f"Expected to record {NumRepeat} DDC points, but got: {npoints_rbv}") - def readFloat(self, address: int, size: int=None) -> float: - """ Read a 64-bit double global variable """ - if address > self.num_real.get(): - raise RuntimeError("Floating point address {address} is out of range") - - if size is None: - self.real_addr.set(address).wait() - return self.real_rb.get() - else: - self.real_addr.set(address).wait() - self.real_size.set(size).wait() - return self.realarr_rb.get() - - def writeFloat(self, address: int, value) -> None: - """ Write a 64-bit float global variable """ - if address > self.num_real.get(): - raise RuntimeError("Float address {address} is out of range") - - if isinstance(value, (int, float)): - self.real_addr.set(address).wait() - self.real.set(value).wait() - elif isinstance(value, np.ndarray): - self.real_addr.set(address).wait() - self.realarr.set(value).wait() - elif isinstance(value, (list, tuple)): - value = np.array(value) - self.real_addr.set(address).wait() - self.realarr.set(value).wait() - else: - raise RuntimeError("Unsupported float value type: {type(value)}") - - def readString(self, address: int) -> str: - """ Read a 40 letter string global variable - ToDo: Automation 1 strings are 256 bytes + def test_13_TomcatSequenceScan(self): + """ Test the zig-zag sequence scan from Tomcat (via AeroScript) + + This module tests basic PSO distance functionality, like events and + waveform generation. The goal is to ensure the basic operation of + the PSO module and the polled feedback. """ - if address > self.num_string.get(): - raise RuntimeError("String address {address} is out of range") - - self.string_addr.set(address).wait() - return self.string_rb.get() - - def writeString(self, address: int, value) -> None: - """ Write a 40 bytes string global variable """ - if address > self.num_string.get(): - raise RuntimeError("Integer address {address} is out of range") - - if isinstance(value, str): - self.string_addr.set(address).wait() - self.string.set(value).wait() - else: - raise RuntimeError("Unsupported string value type: {type(value)}") - + # Throws if IOC is not available or PVs are incorrect + dev = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") + tsk4 = aa1TaskState(AA1_IOC_NAME+ ":TASK:T4:", name="tsk4") + dev.wait_for_connection() + tsk4.wait_for_connection() -class aa1GlobalVariableBindings(Device): - """ Global variables - - This class provides an interface to directly read/write global variables - on the Automation1 controller. These variables are accesible from script - files and are thus a convenient way to interface with the outside word. - - Read operations take as input the memory address and the size - Write operations work with the memory address and value - """ - int0 = Component(EpicsSignalRO, "INT0_RBV", auto_monitor=True, name="int0", kind=Kind.hinted) - int1 = Component(EpicsSignalRO, "INT1_RBV", auto_monitor=True, name="int1", kind=Kind.hinted) - int2 = Component(EpicsSignalRO, "INT2_RBV", auto_monitor=True, name="int2", kind=Kind.hinted) - int3 = Component(EpicsSignalRO, "INT3_RBV", auto_monitor=True, name="int3", kind=Kind.hinted) - int8 = Component(EpicsSignal, "INT8_RBV", put_complete=True, write_pv="INT8", auto_monitor=True, name="int8", kind=Kind.hinted) - int9 = Component(EpicsSignal, "INT9_RBV", put_complete=True, write_pv="INT9", auto_monitor=True, name="int9", kind=Kind.hinted) - int10 = Component(EpicsSignal, "INT10_RBV", put_complete=True, write_pv="INT10", auto_monitor=True, name="int10", kind=Kind.hinted) - int11 = Component(EpicsSignal, "INT11_RBV", put_complete=True, write_pv="INT11", auto_monitor=True, name="int11", kind=Kind.hinted) - - float0 = Component(EpicsSignalRO, "REAL0_RBV", auto_monitor=True, name="float0", kind=Kind.hinted) - float1 = Component(EpicsSignalRO, "REAL1_RBV", auto_monitor=True, name="float1", kind=Kind.hinted) - float2 = Component(EpicsSignalRO, "REAL2_RBV", auto_monitor=True, name="float2", kind=Kind.hinted) - float3 = Component(EpicsSignalRO, "REAL3_RBV", auto_monitor=True, name="float3", kind=Kind.hinted) - float16 = Component(EpicsSignal, "REAL16_RBV", write_pv="REAL16", put_complete=True, auto_monitor=True, name="float16", kind=Kind.hinted) - float17 = Component(EpicsSignal, "REAL17_RBV", write_pv="REAL17", put_complete=True, auto_monitor=True, name="float17", kind=Kind.hinted) - float18 = Component(EpicsSignal, "REAL18_RBV", write_pv="REAL18", put_complete=True, auto_monitor=True, name="float18", kind=Kind.hinted) - float19 = Component(EpicsSignal, "REAL19_RBV", write_pv="REAL19", put_complete=True, auto_monitor=True, name="float19", kind=Kind.hinted) - - # BEC LiveTable crashes on non-numeric values - str0 = Component(EpicsSignalRO, "STR0_RBV", auto_monitor=True, string=True, name="str0", kind=Kind.config) - str1 = Component(EpicsSignalRO, "STR1_RBV", auto_monitor=True, string=True, name="str1", kind=Kind.config) - str4 = Component(EpicsString, "STR4_RBV", put_complete=True, string=True, auto_monitor=True, write_pv="STR4", name="str4", kind=Kind.config) - str5 = Component(EpicsString, "STR5_RBV", put_complete=True, string=True, auto_monitor=True, write_pv="STR5", name="str5", kind=Kind.config) - - - -class aa1AxisIo(Device): - """ Analog / digital Input-Output - - This class provides convenience wrappers around the Aerotech API's axis - specific IO functionality. Note that this is a low-speed API, actual work - should be done in AeroScript. Only one IO pin is polled simultaneously! - """ - polllvl = Component(EpicsSignal, "POLLLVL", put_complete=True, kind=Kind.config) - - ai_addr = Component(EpicsSignal, "AI-ADDR", put_complete=True, kind=Kind.config) - ai = Component(EpicsSignalRO, "AI-RBV", auto_monitor=True, kind=Kind.hinted) - - ao_addr = Component(EpicsSignal, "AO-ADDR", put_complete=True, kind=Kind.config) - ao = Component(EpicsSignal, "AO-RBV", write_pv="AO", auto_monitor=True, kind=Kind.hinted) - - di_addr = Component(EpicsSignal, "DI-ADDR", put_complete=True, kind=Kind.config) - di = Component(EpicsSignalRO, "DI-RBV", auto_monitor=True, kind=Kind.hinted) - - do_addr = Component(EpicsSignal, "DO-ADDR", put_complete=True, kind=Kind.config) - do = Component(EpicsSignal, "DO-RBV", write_pv="DO", auto_monitor=True, kind=Kind.hinted) - - def setAnalog(self, pin: int, value: float, settle_time=0.05): - # Set the address - self.ao_addr.set(pin).wait() - # Set the voltage - self.ao.set(value, settle_time=settle_time).wait() - - def setDigital(self, pin: int, value: int, settle_time=0.05): - # Set the address - self.do_addr.set(pin).wait() - # Set the voltage - self.do.set(value, settle_time=settle_time).wait() + t_start = time() + print("\nTOMCAT Sequeence scan (via AeroScript)") + filename = "test/testScan02.ascript" + with open(filename) as f: + text = f.read() + # Copy file to controller and run it + dev.runScript("tcatZigZagSequence2.ascript", taskIndex=4, filetext=text) + sleep(0.5) + tsk4.complete().wait() -class aa1AxisPsoBase(Device): - """ Position Sensitive Output - Base class - - This class provides convenience wrappers around the Aerotech IOC's PSO - functionality. As a base class, it's just a collection of PVs without - significant logic (that should be implemented in the child classes). - It uses event-waveform concept to produce signals on the configured - output pin: a specified position based event will trigger the generation - of a waveform on the oputput that can be either used as exposure enable, - as individual trigger or as a series of triggers per each event. - As a first approach, the module follows a simple pipeline structure: - Genrator --> Event --> Waveform --> Output - - Specific operation modes should be implemented in child classes. - """ - # ######################################################################## - # General module status - status = Component(EpicsSignalRO, "STATUS", auto_monitor=True, kind=Kind.hinted) - output = Component(EpicsSignalRO, "OUTPUT-RBV", auto_monitor=True, kind=Kind.hinted) - _eventSingle = Component(EpicsSignal, "EVENT:SINGLE", put_complete=True, kind=Kind.omitted) - _reset = Component(EpicsSignal, "RESET", put_complete=True, kind=Kind.omitted) - posInput = Component(EpicsSignal, "DIST:INPUT", put_complete=True, kind=Kind.omitted) - - # ######################################################################## - # PSO Distance event module - dstEventsEna = Component(EpicsSignal, "DIST:EVENTS", put_complete=True, kind=Kind.omitted) - dstCounterEna = Component(EpicsSignal, "DIST:COUNTER", put_complete=True, kind=Kind.omitted) - dstCounterVal = Component(EpicsSignalRO, "DIST:CTR0_RBV", auto_monitor=True, kind=Kind.hinted) - dstArrayDepleted = Component(EpicsSignalRO, "DIST:ARRAY-DEPLETED-RBV", auto_monitor=True, kind=Kind.hinted) - - dstDirection = Component(EpicsSignal, "DIST:EVENTDIR", put_complete=True, kind=Kind.omitted) - dstDistance = Component(EpicsSignal, "DIST:DISTANCE", put_complete=True, kind=Kind.hinted) - dstDistanceArr = Component(EpicsSignal, "DIST:DISTANCES", put_complete=True, kind=Kind.omitted) - dstArrayRearm = Component(EpicsSignal, "DIST:REARM-ARRAY", put_complete=True, kind=Kind.omitted) - - # ######################################################################## - # PSO Window event module - winEvents = Component(EpicsSignal, "WINDOW:EVENTS", put_complete=True, kind=Kind.omitted) - winOutput = Component(EpicsSignal, "WINDOW0:OUTPUT", put_complete=True, kind=Kind.omitted) - winInput = Component(EpicsSignal, "WINDOW0:INPUT", put_complete=True, kind=Kind.omitted) - winCounter = Component(EpicsSignal, "WINDOW0:COUNTER", put_complete=True, kind=Kind.omitted) - _winLower = Component(EpicsSignal, "WINDOW0:LOWER", put_complete=True, kind=Kind.omitted) - _winUpper = Component(EpicsSignal, "WINDOW0:UPPER", put_complete=True, kind=Kind.omitted) - - # ######################################################################## - # PSO waveform module - waveEnable = Component(EpicsSignal, "WAVE:ENABLE", put_complete=True, kind=Kind.omitted) - waveMode = Component(EpicsSignal, "WAVE:MODE", put_complete=True, kind=Kind.omitted) - #waveDelay = Component(EpicsSignal, "WAVE:DELAY", put_complete=True, kind=Kind.omitted) - - # PSO waveform pulse output - #pulseTrunc = Component(EpicsSignal, "WAVE:PULSE:TRUNC", put_complete=True, kind=Kind.omitted) - pulseOnTime = Component(EpicsSignal, "WAVE:PULSE:ONTIME", put_complete=True, kind=Kind.omitted) - pulseWindow = Component(EpicsSignal, "WAVE:PULSE:PERIOD", put_complete=True, kind=Kind.omitted) - pulseCount = Component(EpicsSignal, "WAVE:PULSE:COUNT", put_complete=True, kind=Kind.omitted) - pulseApply = Component(EpicsSignal, "WAVE:PULSE:APPLY", put_complete=True, kind=Kind.omitted) - - # ######################################################################## - # PSO output module - outPin = Component(EpicsSignal, "PIN", put_complete=True, kind=Kind.omitted) - outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.omitted) - - def fire(self, settle_time=None): - """ Fire a single PSO event (i.e. manual software trigger)""" - self._eventSingle.set(1, settle_time=settle_time).wait() - - def toggle(self): - orig_waveMode = self.waveMode.get() - self.waveMode.set("Toggle").wait() - self.fire(0.1) - self.waveMode.set(orig_waveMode).wait() - - - - - -class aa1AxisPsoDistance(aa1AxisPsoBase): - """ Position Sensitive Output - Distance mode - - This class provides convenience wrappers around the Aerotech API's PSO - functionality in distance mode. It uses event-waveform concept to produce - signals on the configured output pin: a specified position based event - will trigger the generation af a waveform on the oputput that can be either - used as exposure enable, as individual trigger or as a series of triggers - per each event. - As a first approach, the module follows a simple pipeline structure: - Genrator --> Event --> Waveform --> Output - - The module provides configuration interface to common functionality, such - as fixed distance or array based triggering and can serve as a base for - future advanced functionality. The relative distances ease the limitations - coming from 32 bit PSO positions. - For a more detailed description of additional signals and masking plase - refer to Automation1's online manual. - """ - - def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): - """ __init__ MUST have a full argument list""" - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - self._Vdistance = 3.141592 + t_end = time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") - # ######################################################################## - # PSO high level interface - def configure(self, d: dict={}) -> tuple: - """ Simplified configuration interface to access the most common - functionality for distance mode PSO. - - :param distance: The trigger distance or the array of distances between subsequent points. - :param wmode: Waveform mode configuration, usually pulsed/toggled. - """ - distance = d['distance'] - wmode = str(d['wmode']) - t_pulse = float(d['t_pulse']) if 't_pulse' in d else None - w_pulse = float(d['w_pulse']) if 'w_pulse' in d else None - n_pulse = float(d['n_pulse']) if 'n_pulse' in d else None - - - # Validate input parameters - if wmode not in ["pulse", "pulsed", "toggle", "toggled"]: - raise RuntimeError(f"Unsupported distace triggering mode: {wmode}") - - old = self.read_configuration() - # Configure distance generator (also resets counter to 0) - self._distanceValue = distance - if isinstance(distance, (float, int)): - self.dstDistance.set(distance).wait() - elif isinstance(distance, (np.ndarray, list, tuple)): - self.dstDistanceArr.set(distance).wait() - - self.winEvents.set("Off").wait() - self.dstCounterEna.set("Off").wait() - self.dstEventsEna.set("Off").wait() - - # Configure the pulsed/toggled waveform - if wmode in ["toggle", "toggled"]: - # Switching to simple toggle mode - self.waveEnable.set("On").wait() - self.waveMode.set("Toggle").wait() - - elif wmode in ["pulse", "pulsed"]: - # Switching to pulsed mode - self.waveEnable.set("On").wait() - self.waveMode.set("Pulse").wait() - # Setting pulse shape - if w_pulse is not None: - self.pulseWindow.set(w_pulse).wait() - if t_pulse is not None: - self.pulseOnTime.set(t_pulse).wait() - if n_pulse is not None: - self.pulseCount.set(n_pulse).wait() - # Commiting configuration - self.pulseApply.set(1).wait() - # Enabling PSO waveform outputs - self.waveEnable.set("On").wait() - else: - raise RuntimeError(f"Unsupported waveform mode: {wmode}") - - # Ensure output is set to low - if self.output.value: - self.toggle() - - # Set PSO output data source - self.outSource.set("Waveform").wait() - - new = self.read_configuration() - return (old, new) - - # ######################################################################## - # Bluesky step scan interface - def stage(self, settle_time=None): - self.dstEventsEna.set("On").wait() - if hasattr(self, "_distanceValue") and isinstance(self._distanceValue, (np.ndarray, list, tuple)): - self.dstArrayRearm.set(1).wait() - self.dstCounterEna.set("On").wait() - if settle_time is not None: - sleep(settle_time) - return super().stage() - - def trigger(self): - return super().trigger() + # Do the readback and check final state + ddc = aa1AxisDriveDataCollection(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME + ":DDC:", name="ddc") + ddc.wait_for_connection() + rbv = yield from ddc.collect() + npoints_rbv = ddc.nsamples_rbv.value + self.assertEqual(npoints_rbv, 10, f"Expected to collect 10 points by now, got {npoints_rbv}") + self.assertEqual(len(rbv["buffer0"]), 10, f"Expected to read back 10 points by now, got {len(rbv['buffer0'])}") + self.assertEqual(len(rbv["buffer1"]), 10, f"Expected to read back 10 points by now, got {len(rbv['buffer1'])}") + self.assertTrue(np.std(rbv["buffer0"])>5, f"Expected some noise on analog input, but std is: {np.std(rbv['buffer1'])}") + self.assertTrue(np.std(rbv["buffer1"])>5, f"Expected some noise on analog input, but std is: {np.std(rbv['buffer1'])}") + print("AX:DDC: Done testing DDC module!") - def unstage(self): - # Turn off counter monitoring - self.dstEventsEna.set("Off").wait() - self.dstCounterEna.set("Off").wait() - return super().unstage() - # ######################################################################## - # Bluesky flyer interface - def kickoff(self) -> DeviceStatus: - # Rearm the configured array - if hasattr(self, "_distanceValue") and isinstance(self._distanceValue, (np.ndarray, list, tuple)): - self.dstArrayRearm.set(1).wait() - # Start monitoring the counters - self.dstEventsEna.set("On").wait() - self.dstCounterEna.set("On").wait() - status = DeviceStatus(self) - status.set_finished() - return status - - def complete(self) -> DeviceStatus: - """ Bluesky flyer interface""" - # Array mode waits until the buffer is empty - if hasattr(self, "_distanceValue") and isinstance(self._distanceValue, (np.ndarray, list, tuple)): - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - def notRunning(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_== 0) else bool(int(value) & 0x1000) - print(f"Old {old_value}\tNew: {value}\tResult: {result}") - timestamp_ = timestamp - return result - - # Subscribe and wait for update - status = SubscriptionStatus(self.status, notRunning, settle_time=0.5) - else: - # In distance trigger mode there's no specific goal - status = DeviceStatus(self) - status.set_finished() - return status - - def describe_collect(self) -> OrderedDict: - ret = OrderedDict() - ret['index'] = {'source': "internal", 'dtype': 'integer', 'shape': [], 'units': '', 'lower_ctrl_limit': 0, 'upper_ctrl_limit': 0} - return {self.name: ret} + def test_14_JinjaTomcatSequenceScan(self): + return + # Load the test file + filename = "test/testSequenceScanTemplate.ascript" + with open(filename) as f: + templatetext = f.read() - def collect(self) -> OrderedDict: - ret = OrderedDict() - ret["timestamps"] = {"index": time.time()} - ret["data"] = {"index": self.dstCounterVal.value } - yield ret - - - - -class aa1AxisPsoWindow(aa1AxisPsoBase): - """ Position Sensitive Output - Window mode - - This class provides convenience wrappers around the Aerotech API's PSO - functionality in window mode. It can either use the event-waveform concept - or provide a direct window output signal (in/out) to the output pin. The - latter is particularly well-suited for the generation of trigger enable - signals, while in event mode it allows the finetuning of trigger lenth. - As a first approach, the module follows a simple pipeline structure: - Genrator --> Event --> Waveform --> Output pin - Genrator --> Window output --> Output pin - - The module provides configuration interface to common functionality, such - as repeated trigger enable signal or fixed area scaning. Unfortunately the - entered positions are absolute, meaning this mode has an inherent limitation - with encoder counters being kept in 32 bit integers. - For a more detailed description of additional signals and masking plase - refer to Automation1's online manual. - """ - def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): - """ __init__ MUST have a full argument list""" - super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - self._mode = "output" - self._eventMode = "Enter" - - # ######################################################################## - # PSO high level interface - def configure(self, d: dict={}) -> tuple: - """ Simplified configuration interface to access the most common - functionality for distance mode PSO. - - :param distance: The trigger distance or the array of distances between subsequent points. - :param wmode: Waveform mode configuration, usually output/pulsed/toggled. - """ - bounds = d['distance'] - wmode = str(d['wmode']) - emode = str(d['emode']) - t_pulse = float(d['t_pulse']) if 't_pulse' in d else None - w_pulse = float(d['w_pulse']) if 'w_pulse' in d else None - n_pulse = float(d['n_pulse']) if 'n_pulse' in d else None - - # Validate input parameters - if wmode not in ["pulse", "pulsed", "toggle", "toggled", "output", "flag"]: - raise RuntimeError(f"Unsupported window triggering mode: {wmode}") - self._mode = wmode - self._eventMode = emode + import jinja2 + scanconfig = {'startpos': 42, 'scanrange': 180, 'nrepeat': 10, 'scandir': 'NegPos', 'scanvel': 90, 'scanacc': 500} + tm = jinja2.Template(templatetext) + text = tm.render(scan=scanconfig) + with open("test/tcatSequenceScan.ascript", "w") as text_file: + text_file.write(text) - old = self.read_configuration() - - # Configure the window module - # Set the window ranges (MUST be in start position) - if len(bounds) == 2: - self.winCounter.set(0).wait() - self._winLower.set(bounds[0]).wait() - self._winUpper.set(bounds[1]).wait() - - elif isinstance(bounds, np.ndarray): - # ToDo... - pass - - # Don't start triggering just yet - self.winOutput.set("Off").wait() - self.winEvents.set("Off").wait() - - # Configure the pulsed/toggled waveform - if wmode in ["toggle", "toggled"]: - # Switching to simple toggle mode - self.waveEnable.set("On").wait() - self.waveMode.set("Toggle").wait() - elif wmode in ["pulse", "pulsed"]: - # Switching to pulsed mode - self.waveEnable.set("On").wait() - self.waveMode.set("Pulse").wait() - # Setting pulse shape - self.pulseWindow.set(w_pulse).wait() - self.pulseOnTime.set(t_pulse).wait() - self.pulseCount.set(n_pulse).wait() - # Commiting configuration - self.pulseApply.set(1).wait() - # Enabling PSO waveform outputs - self.waveEnable.set("On").wait() - elif wmode in ["output", "flag"]: - self.waveEnable.set("Off").wait() - else: - raise RuntimeError(f"Unsupported window mode: {wmode}") - - # Set PSO output data source - if wmode in ["toggle", "toggled", "pulse", "pulsed"]: - self.outSource.set("Waveform").wait() - elif wmode in ["output", "flag"]: - self.outSource.set("Window").wait() - - new = self.read_configuration() - return (old, new) - - def stage(self, settle_time=None): - if self.outSource.get() in ["Window", 2]: - self.winOutput.set("On").wait() - else: - self.winEvents.set(self._eventMode).wait() - if settle_time is not None: - sleep(settle_time) - return super().stage() - - def kickoff(self, settle_time=None): - if self.outSource.get() in ["Window", 2]: - self.winOutput.set("On").wait() - else: - self.winEvents.set(self._eventMode).wait() - if settle_time is not None: - sleep(settle_time) + # Throws if IOC is not available or PVs are incorrect + dev = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") + tsk4 = aa1TaskState(AA1_IOC_NAME+ ":TASK:T4:", name="tsk4") + dev.wait_for_connection() + tsk4.wait_for_connection() + print("\nTOMCAT Sequeence scan (via Jinjad AeroScript)") - def unstage(self, settle_time=None): - self.winOutput.set("Off").wait() - self.winEvents.set("Off").wait() - if settle_time is not None: - sleep(settle_time) - return super().unstage() + # Copy file to controller and run it + t_start = time() + dev.runScript("tcatSequenceScan.ascript", taskIndex=4, filetext=text) + sleep(0.5) + tsk4.complete().wait() + t_end = time() + t_elapsed = t_end - t_start + print(f"Elapsed scan time: {t_elapsed}") - - - -class aa1AxisDriveDataCollection(Device): - """ Axis data collection - - This class provides convenience wrappers around the Aerotech API's axis - specific data collection functionality. This module allows to record - hardware synchronized signals with up to 200 kHz. - - The default configuration is using a fixed memory mapping allowing up to - 1 million recorded data points on an XC4e (this depends on controller). - """ - - # ######################################################################## - # General module status - nsamples_rbv = Component(EpicsSignalRO, "SAMPLES_RBV", auto_monitor=True, kind=Kind.hinted) - _switch = Component(EpicsSignal, "ACQUIRE", put_complete=True, kind=Kind.omitted) - _input0 = Component(EpicsSignal, "INPUT0", put_complete=True, kind=Kind.config) - _input1 = Component(EpicsSignal, "INPUT1", put_complete=True, kind=Kind.config) - _trigger = Component(EpicsSignal, "TRIGGER", put_complete=True, kind=Kind.config) - - npoints = Component(EpicsSignal, "NPOINTS", put_complete=True, kind=Kind.config) - _readback0 = Component(EpicsSignal, "AREAD0", kind=Kind.omitted) - _readstatus0 = Component(EpicsSignalRO, "AREAD0_RBV", auto_monitor=True, kind=Kind.omitted) - _readback1 = Component(EpicsSignal, "AREAD1", kind=Kind.omitted) - _readstatus1 = Component(EpicsSignalRO, "AREAD1_RBV", auto_monitor=True, kind=Kind.omitted) - - _buffer0 = Component(EpicsSignalRO, "BUFFER0", auto_monitor=True, kind=Kind.hinted) - _buffer1 = Component(EpicsSignalRO, "BUFFER1", auto_monitor=True, kind=Kind.hinted) - - def configure(self, d: dict={}) -> tuple: - npoints = int(d['npoints']) - trigger = int(d['trigger']) if 'trigger' in d else DriveDataCaptureTrigger.PsoOutput - source0 = int(d['source0']) if 'source0' in d else DriveDataCaptureInput.PrimaryFeedback - source1 = int(d['source1']) if 'source1' in d else DriveDataCaptureInput.PositionCommand - - old = self.read_configuration() - - self._input0.set(source0).wait() - self._input1.set(source1).wait() - self._trigger.set(trigger).wait() - # This allocates the memory... - self.npoints.set(npoints).wait() - - new = self.read_configuration() - return (old, new) - - # Bluesky step scanning interface - def stage(self, settle_time=0.1): - super().stage() - self._switch.set("Start", settle_time=0.5).wait() - status = Status(timeout=0.1, settle_time=settle_time).set_finished() - return status - - def unstage(self, settle_time=0.1): - self._switch.set("Stop", settle_time=settle_time).wait() - super().unstage() - print(f"Recorded samples: {self.nsamples_rbv.value}") - - # Bluesky flyer interface - def kickoff(self, settle_time=0.1) -> Status: - status = self._switch.set("Start", settle_time=settle_time) - return status - - def complete(self, settle_time=0.1) -> DeviceStatus: - """ DDC just reads back whatever is available in the buffers""" - status = DeviceStatus(self, settle_time=settle_time) - status.set_finished() - return status - - def _collect(self, index=0): - """ Force a readback of the data buffer - - Note that there's a weird behaviour in ophyd that it issues an - initial update event with the initial value but 0 timestamp. Theese - old_values are invalid and must be filtered out. - """ - - # Define wait until the busy flag goes down (excluding initial update) - timestamp_ = 0 - def negEdge(*args, old_value, value, timestamp, **kwargs): - nonlocal timestamp_ - result = False if (timestamp_== 0) else (old_value == 1 and value == 0) - # print(f"\nBuffer1 status:\t{old_value} ({timestamp_}) to {value} ({timestamp}) Result: {result}") - timestamp_ = timestamp - return result - - if index==0: - status = SubscriptionStatus(self._readstatus0, negEdge, settle_time=0.5) - self._readback0.set(1).wait() - elif index==1: - status = SubscriptionStatus(self._readstatus1, negEdge, settle_time=0.5) - self._readback1.set(1).wait() - - # Start asynchronous readback - status.wait() - return status - - - def describe_collect(self) -> OrderedDict: - ret = OrderedDict() - ret['buffer0'] = {'source': "internal", 'dtype': 'array', 'shape': [], 'units': '', 'lower_ctrl_limit': 0, 'upper_ctrl_limit': 0} - ret['buffer1'] = {'source': "internal", 'dtype': 'array', 'shape': [], 'units': '', 'lower_ctrl_limit': 0, 'upper_ctrl_limit': 0} - return {self.name: ret} - - def collect(self) -> OrderedDict: - - self._collect(0).wait() - self._collect(1).wait() - - b0 = self._buffer0.value - b1 = self._buffer1.value - ret = OrderedDict() - ret["timestamps"] = {"buffer0": time.time(), "buffer1": time.time() } - ret["data"] = {"buffer0": b0, "buffer1": b1 } - yield ret - - - - - -# Automatically start simulation if directly invoked if __name__ == "__main__": - AA1_IOC_NAME = "X02DA-ES1-SMP1" - AA1_AXIS_NAME = "ROTY" - # Drive data collection - task = aa1Tasks(AA1_IOC_NAME+":TASK:", name="tsk") - task.wait_for_connection() - task.describe() - ddc = aa1AxisDriveDataCollection("X02DA-ES1-SMP1:ROTY:DDC:", name="ddc") - ddc.wait_for_connection() - globb = aa1GlobalVariableBindings(AA1_IOC_NAME+":VAR:", name="globb") - globb.wait_for_connection() - globb.describe() - mot = EpicsMotor(AA1_IOC_NAME+ ":" + AA1_AXIS_NAME, name="x") - mot.wait_for_connection() + unittest.main() + + + + + + + + + + + + + + + + + + + + +