66 lines
1.6 KiB
Python
66 lines
1.6 KiB
Python
from ch.psi.pshell.bs import Stream
|
|
from ch.psi.pshell.bs import Provider
|
|
from ch.psi.pshell.bs.ProviderConfig import SocketType;
|
|
|
|
|
|
providers, streams = [], []
|
|
for i in range(9000, 9008):
|
|
p = Provider("Provider"+str(i),"tcp://SFTEST-CVME-DBPM1:"+str(i), SocketType.PULL)
|
|
p.initialize()
|
|
providers.append(p)
|
|
s = Stream("Stream"+str(i), p)
|
|
s.initialize()
|
|
streams.append(s)
|
|
|
|
def check(s):
|
|
print "Starting checking " + s.getName()
|
|
divider = 3 if s.name == "Stream9000" else 1
|
|
pid = s.take().pulseId
|
|
while(True):
|
|
s.waitCacheChange(10000)
|
|
p=s.take().pulseId
|
|
if p!= (pid + divider):
|
|
print s.getName() + " error: received pid %d, expecting %d" % (p, (pid + 1))
|
|
pid = p
|
|
|
|
class StreamListener (DeviceListener):
|
|
def onCacheChanged(self, device, value, former, timestamp, valueChange)
|
|
if self.pid!= (value.pulseId + self.divider):
|
|
print s.getName() + " error: received pid %d, expecting %d" % (value.pulseId, (self.pid+ 1))
|
|
pid = value.pulseId
|
|
|
|
|
|
try:
|
|
for s in streams:
|
|
s.start(True)
|
|
s.waitCacheChange(5000)
|
|
print s.getName() + " channels:" + str(s.getIdentifiers())
|
|
"""
|
|
args = []
|
|
for s in streams:
|
|
args.append((check,(s,)))
|
|
parallelize(*args)
|
|
#check(streams[0])
|
|
"""
|
|
"
|
|
for s in streams:
|
|
l = StreamListener()
|
|
l.divider = 3 if s.name == "Stream9000" else 1
|
|
l.pid = s.take().pulseId
|
|
s.addListener(listenerAI)
|
|
|
|
finally:
|
|
try:
|
|
for s in streams:
|
|
s.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|