Files
sf-op/script/test/TestMergePipeline.py
gobbo_a 8beeb7dbc7
2023-08-17 15:16:51 +02:00

43 lines
1.3 KiB
Python

SCAN_TIME = 60
#dispatcher.config.mappingIncomplete=ch.psi.pshell.bs.DispatcherConfig.Incomplete.drop
st1 = Stream("st1", "tcp://sf-daqsync-08.psi.ch:9001", SocketType.SUB)
#st1 = Stream("st1", "tcp://sf-daqsync-02.psi.ch:31852", SocketType.PULL)
st2 = Stream("st1", dispatcher)
st2.addScalar("B1","SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK", 1, 0)
st2.addScalar("B2","SIN-CVME-TIFGUN-EVR0:BUNCH-2-OK", 1, 0)
pids={}
count={}
def after_read(record, scan):
global pids, count
lastpid=pids.get(scan)
pid=record[0]
if lastpid:
if (lastpid+1)!=pid:
print scan.tag, "received ", pid, " - waiting for ", (lastpid + 1)
count[scan]=count[scan]+(pid - (lastpid+1))
else:
count[scan]=0
pids[scan] = pid
def scan (st, name):
st.initialize()
st.start()
st.waitCacheChange(3000)
try:
r=bscan (st, records=-1, timeout=SCAN_TIME, save=False, keep=False, #
enabled_plots=[st1,], \
#manual_range_y=[17449000000, 17551000000],
title=name, tag=name, after_read=after_read)
v= st.getValues()
return r
finally:
st.close()
r1,r2 = parallelize( (scan,(st1, "Merge")) , \
(scan, (st2, "Dispatcher")))
for sc in count.keys():
print sc.tag, count[sc]