from ch.psi.pshell.bs import Stream from ch.psi.pshell.bs import Provider from ch.psi.pshell.bs.ProviderConfig import SocketType; providers, streams = [], [] for i in range(9000, 9008): p = Provider("Provider"+str(i),"tcp://SFTEST-CVME-DBPM1:"+str(i), SocketType.PULL) p.initialize() providers.append(p) s = Stream("Stream"+str(i), p) s.initialize() streams.append(s) def check(s): print "Starting checking " + s.getName() divider = 3 if s.name == "Stream9000" else 1 pid = s.take().pulseId while(True): s.waitCacheChange(10000) p=s.take().pulseId if p!= (pid + divider): print s.getName() + " error: received pid %d, expecting %d" % (p, (pid + 1)) pid = p class StreamListener (DeviceListener): def onCacheChanged(self, device, value, former, timestamp, valueChange): if value.pulseId != (self.pid + self.divider): print s.getName() + " error: received pid %d, expecting %d" % (value.pulseId, (self.pid+ 1)) self.pid = value.pulseId try: for s in streams: s.start(True) s.waitCacheChange(5000) print s.getName() + " channels:" + str(s.getIdentifiers()) """ args = [] for s in streams: args.append((check,(s,))) parallelize(*args) #check(streams[0]) """ for s in streams: l = StreamListener() l.divider = 3 if s.name == "Stream9000" else 1 l.pid = s.take().pulseId s.addListener(l) sleep(36000) finally: try: for s in streams: s.close() except: pass