import sys import time import math from array import array import java.lang.Class as Class import org.python.core.PyArray as PyArray import ch.psi.pshell.dev.MotorGroupBase.MoveMode as MoveMode import ch.psi.pshell.scan.LineScan import ch.psi.pshell.scan.AreaScan import ch.psi.pshell.scan.VectorScan import ch.psi.pshell.dev.Readable as Readable import ch.psi.pshell.epics.Epics as Epics import ch.psi.pshell.epics.EpicsScan as EpicsScan def onScanBeforeReadout(scan): try: if (scan.before_read!=None): scan.before_read() except AttributeError: pass def onScanAfterReadout(scan): try: if (scan.after_read!=None): scan.after_read() except AttributeError: pass class LineScan(ch.psi.pshell.scan.LineScan): def onBeforeReadout(self): onScanBeforeReadout(self) def onAfterReadout(self): onScanAfterReadout(self) class AreaScan(ch.psi.pshell.scan.AreaScan): def onBeforeReadout(self): onScanBeforeReadout(self) def onAfterReadout(self): onScanAfterReadout(self) class VectorScan(ch.psi.pshell.scan.VectorScan): def onBeforeReadout(self): onScanBeforeReadout(self) def onAfterReadout(self): onScanAfterReadout(self) def sleep(sec): time.sleep(sec) def toList(obj): if isinstance(obj,tuple): return list(obj) if not isinstance(obj,list): return [obj,] return obj def lscan(writables, readables, start, end, steps, latency=0.0, plot=None, before_read=None, after_read=None): """Line Scan: positioners change together, linearly from start to end positions. Args: writables(list of Writable): Positioners set on each step. readables(list of Readable): Sensors to be sampled on each step. start(list of float): start positions of writables. end(list of float): final positions of writables. steps(int): number of scan steps. latency(float, optional): sleep time in each step before readout, defaults to 0.0. plot(str, optional): plotting context name. before_read (function): callback on each step, before each readout. after_read (function): callback on each step, after each readout. Returns: ArrayList of ScanRecord objects. """ latency_ms=int(latency*1000) writables=toList(writables) readables=toList(readables) start=toList(start) end=toList(end) scan = LineScan(writables,readables, start, end , steps,latency_ms, controller) scan.before_read=before_read scan.after_read=after_read scan.setPlotName(plot) scan.start() return scan.getResult() def vscan(writables, readables, vector, latency=0.0, plot=None, before_read=None, after_read=None): """Vector Scan: positioners change following values provided in a vector. Args: writables(list of Writable): Positioners set on each step. readables(list of Readable): Sensors to be sampled on each step. vector(list of list of float): table of positioner values. latency(float, optional): sleep time in each step before readout, defaults to 0.0. plot(str, optional): plotting context name. before_read (function): callback on each step, before each readout. after_read (function): callback on each step, after each readout. Returns: ArrayList of ScanRecord objects. """ latency_ms=int(latency*1000) writables=toList(writables) readables=toList(readables) scan = VectorScan(writables,readables, vector, latency_ms, controller) scan.before_read=before_read scan.after_read=after_read scan.setPlotName(plot) scan.start() return scan.getResult() def ascan(writables, readables, start, end, steps, latency=0.0, plot=None, before_read=None, after_read=None): """Area Scan: multi-dimentional scan, each positioner is a dimention. Args: writables(list of Writable): Positioners set on each step. readables(list of Readable): Sensors to be sampled on each step. start(list of float): start positions of writables. end(list of float): final positions of writables. steps(list of int): number of scan steps for each positioner. latency(float, optional): sleep time in each step before readout, defaults to 0.0. plot(str, optional): plotting context name. before_read (function): callback on each step, before each readout. after_read (function): callback on each step, after each readout. Returns: ArrayList of ScanRecord objects. """ latency_ms=int(latency*1000) writables=toList(writables) readables=toList(readables) start=toList(start) end=toList(end) scan = AreaScan(writables,readables, start, end , steps,latency_ms, controller) scan.before_read=before_read scan.after_read=after_read scan.setPlotName(plot) scan.start() return scan.getResult() def tscan(readables, points, interval, plot=None, before_read=None, after_read=None): """Time Scan: sensors are sampled in fixed time intervals. Args: readables(list of Readable): Sensors to be sampled on each step. points(int): number of samples. interval(float): time interval between readouts. plot(str, optional): plotting context name. before_read (function): callback on each step, before each readout. after_read (function): callback on each step, after each readout. Returns: ArrayList of ScanRecord objects. """ latency_ms=int(interval*1000) writables=[] readables=toList(readables) start=[0,] end=[points,] steps=points scan = LineScan(writables,readables, start, end , steps,latency_ms, controller) scan.before_read=before_read scan.after_read=after_read scan.setPlotName(plot) scan.start() return scan.getResult() def escan(name, plot=None): """Epics Scan: execute an Epics Scan Record. Args: name(str): Name of scan record. plot(str, optional): plotting context name. Returns: ArrayList of ScanRecord objects. """ scan = EpicsScan(name, controller) scan.setPlotName(plot) scan.start() return scan.getResult() class ListReader(Readable): def __init__(self,list, plot): self.counter=0 self.list=list def getName(self): return None def read(self): if (self.counter>=len(self.list)): return None; ret = self.list[self.counter] self.counter=self.counter+1 return ret def plot(data, plot=None): """Plot a list in a graph. Args: data(list): data to be plotted plot(str, optional): plotting context name. """ latency_ms=0 data=toList(data) writables=[] readables=[ListReader(data,plot),] start=[0,] end=[len(data),] steps=len(data)-1 scan = LineScan(writables,readables, start, end , steps,latency_ms, controller) scan.setPlotName(plot) scan.start() channel_types = { 'b': "java.lang.Byte", 'i': "java.lang.Short", 'l': "java.lang.Integer", 'd': "java.lang.Double", 's': "java.lang.String", 'b,': "[B", 'i,': "[S", 'l,': "[I", 'd,': "[D", 's,': "[Ljava.lang.String;", } array_types = { int: "i", long: "l", float:"d", str:Class.forName("java.lang.String"), } def caget(name, type='d', size=None): """Reads an Epics PV. Args: name(str): PV name type(str, optional): type of PV, defaults 'd'. Scalar values: 'b', 'i', 'l', 'd', 's'. Array: values: 'b,', 'i,', 'l,', 'd,', 's,'. """ if (size==None): return Epics.get(name, Class.forName(channel_types.get(type,type))) else: return Epics.get(name, Class.forName(channel_types.get(type,type)),size) def adjustPutVal(value): if isinstance(value,tuple): value = list(value) if isinstance(value,list): array = PyArray(array_types.get(type(value[0]),'d')) array.fromlist(value) value=array return value def caput(name, value): """Writes to an Epics PV. Args: name(str): PV name value(scalar, string or array): new PV value. """ value=adjustPutVal(value) return Epics.put(name, value) def caputq(name, value): """Writes to an Epics PV and does not wait. Args: name(str): PV name value(scalar, string or array): new PV value. """ value=adjustPutVal(value) return Epics.putq(name, value) def log(log): """Writes log to data file. Args: log(str): Log string """ controller.getDataManager().addLog(str(log)) def startBackgroudTask(script, interval): interval_ms=int(interval*1000) controller.taskManager.create(script,interval_ms) controller.taskManager.start(script) def stopBackgroudTask(script): controller.taskManager.remove(script) execfile(controller.setup.getScriptPath()+"/local.py")