Files
dev/script/correlation_stream.py
2021-04-28 09:29:19 +02:00

162 lines
4.6 KiB
Python

"""
Run as:
pshell_be -l -test -c -f="users/edwin/correlation_stream.py" -args="PREFIX:'SLAAR-LBSTREAM1'"
or with additional parameters:
pshell_be -l -test -c -f="users/edwin/correlation_stream.py" -args="PREFIX:'SLAAR-LBSTREAM1',VERBOSE:False,TIME_INTERVAL:2.0"
"""
if not "PREFIX" in globals():
PREFIX = "SLAAR-LBSTREAM1"
if not "VERBOSE" in globals():
VERBOSE = True
if not "TIME_INTERVAL" in globals():
TIME_INTERVAL = 1.0
EMPTY_VALUE = 1112223330
if PREFIX[-1] != ":":
PREFIX = PREFIX + ":"
def get_channel_list():
ch=[]
try:
for i in range(20):
v= caget(PREFIX + "NAME" + str(i+1))
if v.strip()=="":
break
ch.append(str(v.strip()))
except:
pass
return ch
def ack_channel_list(channel_list):
try:
s=""
for c in channel_list:
s=s+c+"\n"
s=s+"\x00"
caput(PREFIX + "PACK_LIST",[ord(c) for c in s])
except:
pass
def start_stream(channel_list):
try:
st=None
print "Starting stream"
if len(channel_list)==0:
raise Exception ("No channel defined")
st = Stream("pulse_id", dispatcher)
for c in channel_list:
st.addScalar(c, c, 1, 0)
st.initialize()
st.start()
st.waitCacheChange(10000) #Wait stream be running before starting scan
if st.take() is None:
raise Exception("Error initializing data stream")
ack_channel_list(channel_list)
print "Stream started"
except:
stop_stream(st)
print sys.exc_info()[1]
return st
def stop_stream(st):
try:
if st:
st.close()
st=None
print "Stream closed"
except:
print sys.exc_info()[1]
finally:
ack_channel_list([])
#def on_start_change(val):
# fork(start_stream if (val==1) else stop_stream)
#start = Channel(PREFIX + "START_STOP", type = 'i', monitored = True, callback=on_start_change)
t0=None
pid=None
def handle_message(msg, channel_list, buf):
global t0, pid
if t0 is None:
t0 = time.time()
now=time.time()
num_channels = len(channel_list)
size_buffer= 300 #int((len(buf)-100)/(len(channel_list)+1))
buf[0] = msg.pulseId
buf[1] = num_channels
buf[4] = EMPTY_VALUE
if (pid is not None) and ((pid+1) != msg.pulseId):
print "Missing pid: ", (pid+1)
pid = msg.pulseId
#Count
if buf[2] < size_buffer:
buf[2] = buf[2]+1
#Index
buf[3] = buf[3]+1
if buf[3] == size_buffer:
buf[3]=0
index = int(buf[3])*(num_channels+1) + 10
buf[index] = msg.pulseId
index = index+1
for v in msg.values():
buf[index] = EMPTY_VALUE if ((v is None) or (math.isnan(v)))else v
index = index+1
if now >= (t0 + TIME_INTERVAL):
if VERBOSE:
print to_list(buf[0:5])
t0 = time.time()
return True
return False
def run():
channel_list = []
st=None
start = Channel(PREFIX + "START_STOP", type = 'i')
outp = Channel(PREFIX + "PACK_DATA", type = '[d')
buf = to_array([0]*outp.get_size(),'d')
buf[2] = 0
buf[3]= -1
try:
while True:
started = (start.get()==1)
if started and not st:
print "Started"
channel_list=get_channel_list()
print "Channel list: ", channel_list
st = start_stream(channel_list)
class StreamListener (DeviceListener):
def onValueChanged(self, device, value, former):
if handle_message(value, channel_list, buf):
outp.putq(buf)
listener = StreamListener()
st.addListener(listener)
elif st and not started:
print "Stopped"
channel_list=[]
stop_stream(st)
st = None
"""
if st:
st.waitCacheChange(0)
if handle_message(st.take(), channel_list, buf):
outp.putq(buf)
time.sleep(0.001)
else:
time.sleep(0.1)
"""
time.sleep(0.1)
finally:
stop_stream(st)
if outp: outp.close()
if start: start.close()
if __name__ == "__main__":
run()