Files
tst/script/test_channels.py
2026-03-13 15:28:30 +01:00

90 lines
2.6 KiB
Python

if not "_pvs" in globals():
_pvs = {}
def get_waveform_pv(name, typ='double', size=None):
if name not in _pvs:
class Waveform(RegisterBase, RegisterArray):
def __init__(self, name):
RegisterBase.__init__(self, name)
self.fixed_size = None
self.clear()
def clear(self):
self.val = None
#if self.fixed_size:
# self.val = [float("nan")] * self.fixed_size
#else:
# self.val = [float("nan")]
def doRead(self):
return self.val
def doWrite(self, val):
if val is None or len(val) == 0:
self.clear()
elif self.fixed_size:
self.clear()
del val[self.fixed_size:]
self.val[:len(val)] = val
else:
self.val = val
print self.val
def getSize(self):
return len(self.val)
def fixSize(self, size):
self.fixed_size = size
self.clear()
waveform = Waveform(name)
#waveform.fixSize(size)
waveform.initialize()
cas = CAS(name, waveform, typ)
_pvs[name] = [waveform, cas]
return _pvs[name][0]
def get_scalar_pv(name, typ='double', size=None):
if name not in _pvs:
class Scalar(RegisterBase):
def __init__(self, name):
RegisterBase.__init__(self, name)
self.fixed_size = None
self.clear()
def clear(self):
self.val = float("nan")
def doRead(self):
return self.val
def doWrite(self, val):
if val is None:
self.clear()
else:
self.val = val
scalar = Scalar(name)
scalar.initialize()
cas = CAS(name, scalar, typ)
_pvs[name] = [scalar, cas]
return _pvs[name][0]
pv = get_waveform_pv("TEST_CHANNELS:ai1","double")
sc = get_scalar_pv("TEST_CHANNELS:ai2","double")
def after_read(record, scan):
#print scan.result[ai1] + [record[ai1],]
pv.write(scan.result[ai1])
sc.write(record.index)
r= tscan((ai1, ai1), 5, 0.1, after_read=after_read, save=False)
pv.write(r[ai1])
sc.write(len(r))
print pv.read()
print sc.read()