Files
dev/script/test/TestMonitor.py
2023-05-01 11:28:04 +02:00

125 lines
3.4 KiB
Python

###################################################################################################
# Example of saving diagostics and snapshots during a scan
###################################################################################################
DIAGS = [ai2]
SNAPS = [ai3, wf1]
ret = lscan(m1, ai1, 0.0, 1.0, 4, diags=DIAGS, snaps=SNAPS)
plot(ret.getSnap(wf1))
plot(ret.getDiag(ai2))
#All devices can be directly indexd
for dev in [m1, ai1, ai2, wf1, ai3]:
print dev.name, " -> ", ret[dev]
#Changing default folders
LayoutBase.PATH_SNAPS = "initial/"
LayoutBase.PATH_DIAGS = "status/"
LayoutBase.PATH_MONITORS = "events/"
LayoutBase.PATH_LOGS = "logbook/"
ret = lscan(m1, ai1, 0.0, 1.0, 4, diags=DIAGS, snaps=SNAPS)
ret = tscan(out, 10, 0.2, diags=[inp, arr], monitors=[inp, sin], snapshots=[inp, motor])
#set_exec_pars(flush=True)
#print ret[out]
print ret[sin]
print ret["arr"]
print ret[motor]
#print ret.getDiag(sin)
#print ret.getDiag(arr)
#ret = tscan(out, 10, 0.2, monitors=[sin, arr])
#plot(ret[sin][1], xdata=ret[sin][0])
#plot(ret[out], xdata=ret.timestamps)
"""
class OtfValue(ReadonlyAsyncRegisterBase):
def __init__(self, name):
ReadonlyAsyncRegisterBase.__init__(self, name)
def setValue(self, value,timestamp):
self.setCache(value,timestamp)
class OTF(Otf):
def __init__(self, name):
Otf.__init__(self, name)
self.addComponent(OtfValue("Channel1"))
self.addComponent(OtfValue("Channel2"))
self.components[0].setValue(float("NaN"), long(time.time()*10e6))
self.components[1].setValue(-1, long(time.time()*10e6))
self.initialize()
self.interrupted = False
def task(self):
start = time.time()
finished=False
while not self.interrupted and not finished:
time.sleep(0.1)
self.components[0].setValue(time.time(), long(time.time()*10e6))
finished = (time.time()-start)> 5.0
if not self.interrupted:
for i in range(20):
self.components[1].setValue(i, long(time.time()*10e6))
self.getLogger().info("Finished OTF")
self.state=State.Ready
def start(self):
self.state=State.Busy
self.interrupted = False
self.thread = fork(self.task)
def abort(self):
self.interrupted = True
self.getLogger().info("Interrupting OTF thread")
join(self.thread)
add_device(OTF("otf"), True)
ret = tscan(out, 10, 0.2, monitors=[otf])
"""
"""
class OTF(Otf):
def __init__(self, name):
Otf.__init__(self, name)
self.setCache(float("NaN"), long(time.time()*10e6))
self.initialize()
self.interrupted = False
def task(self):
start = time.time()
finished=False
while not self.interrupted and not finished:
time.sleep(0.1)
self.setCache(time.time(), long(time.time()*10e6))
finished = (time.time()-start)> 5.0
self.getLogger().info("Finished OTF")
self.state=State.Ready
def start(self):
self.state=State.Busy
self.interrupted = False
self.thread = fork(self.task)
def abort(self):
self.interrupted = True
self.getLogger().info("Interrupting OTF thread")
join(self.thread)
add_device(OTF("otf"), True)
ret = tscan(out, 10, 0.2, monitors=[otf])
plot(ret[otf][1], xdata=ret[otf][0])
"""
set_return(None)