Update
This commit is contained in:
162
script/correlation_stream.py
Normal file
162
script/correlation_stream.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""
|
||||
Run as:
|
||||
pshell_be -l -test -c -f="users/edwin/correlation_stream.py" -args="PREFIX:'SLAAR-LBSTREAM1'"
|
||||
|
||||
or with additional parameters:
|
||||
pshell_be -l -test -c -f="users/edwin/correlation_stream.py" -args="PREFIX:'SLAAR-LBSTREAM1',VERBOSE:False,TIME_INTERVAL:2.0"
|
||||
"""
|
||||
|
||||
if not "PREFIX" in globals():
|
||||
PREFIX = "SLAAR-LBSTREAM1"
|
||||
if not "VERBOSE" in globals():
|
||||
VERBOSE = True
|
||||
if not "TIME_INTERVAL" in globals():
|
||||
TIME_INTERVAL = 1.0
|
||||
|
||||
EMPTY_VALUE = 1112223330
|
||||
|
||||
|
||||
if PREFIX[-1] != ":":
|
||||
PREFIX = PREFIX + ":"
|
||||
|
||||
def get_channel_list():
|
||||
ch=[]
|
||||
try:
|
||||
for i in range(20):
|
||||
v= caget(PREFIX + "NAME" + str(i+1))
|
||||
if v.strip()=="":
|
||||
break
|
||||
ch.append(str(v.strip()))
|
||||
except:
|
||||
pass
|
||||
return ch
|
||||
|
||||
def ack_channel_list(channel_list):
|
||||
try:
|
||||
s=""
|
||||
for c in channel_list:
|
||||
s=s+c+"\n"
|
||||
s=s+"\x00"
|
||||
caput(PREFIX + "PACK_LIST",[ord(c) for c in s])
|
||||
except:
|
||||
pass
|
||||
|
||||
def start_stream(channel_list):
|
||||
try:
|
||||
st=None
|
||||
print "Starting stream"
|
||||
if len(channel_list)==0:
|
||||
raise Exception ("No channel defined")
|
||||
st = Stream("pulse_id", dispatcher)
|
||||
for c in channel_list:
|
||||
st.addScalar(c, c, 1, 0)
|
||||
st.initialize()
|
||||
st.start()
|
||||
st.waitCacheChange(10000) #Wait stream be running before starting scan
|
||||
if st.take() is None:
|
||||
raise Exception("Error initializing data stream")
|
||||
ack_channel_list(channel_list)
|
||||
print "Stream started"
|
||||
except:
|
||||
stop_stream(st)
|
||||
print sys.exc_info()[1]
|
||||
return st
|
||||
|
||||
def stop_stream(st):
|
||||
try:
|
||||
if st:
|
||||
st.close()
|
||||
st=None
|
||||
print "Stream closed"
|
||||
except:
|
||||
print sys.exc_info()[1]
|
||||
finally:
|
||||
ack_channel_list([])
|
||||
|
||||
#def on_start_change(val):
|
||||
# fork(start_stream if (val==1) else stop_stream)
|
||||
#start = Channel(PREFIX + "START_STOP", type = 'i', monitored = True, callback=on_start_change)
|
||||
|
||||
t0=None
|
||||
pid=None
|
||||
def handle_message(msg, channel_list, buf):
|
||||
global t0, pid
|
||||
if t0 is None:
|
||||
t0 = time.time()
|
||||
now=time.time()
|
||||
num_channels = len(channel_list)
|
||||
size_buffer= 300 #int((len(buf)-100)/(len(channel_list)+1))
|
||||
buf[0] = msg.pulseId
|
||||
buf[1] = num_channels
|
||||
buf[4] = EMPTY_VALUE
|
||||
if (pid is not None) and ((pid+1) != msg.pulseId):
|
||||
print "Missing pid: ", (pid+1)
|
||||
pid = msg.pulseId
|
||||
#Count
|
||||
if buf[2] < size_buffer:
|
||||
buf[2] = buf[2]+1
|
||||
#Index
|
||||
buf[3] = buf[3]+1
|
||||
if buf[3] == size_buffer:
|
||||
buf[3]=0
|
||||
index = int(buf[3])*(num_channels+1) + 10
|
||||
buf[index] = msg.pulseId
|
||||
index = index+1
|
||||
for v in msg.values():
|
||||
buf[index] = EMPTY_VALUE if ((v is None) or (math.isnan(v)))else v
|
||||
index = index+1
|
||||
if now >= (t0 + TIME_INTERVAL):
|
||||
if VERBOSE:
|
||||
print to_list(buf[0:5])
|
||||
t0 = time.time()
|
||||
return True
|
||||
return False
|
||||
|
||||
def run():
|
||||
channel_list = []
|
||||
st=None
|
||||
start = Channel(PREFIX + "START_STOP", type = 'i')
|
||||
outp = Channel(PREFIX + "PACK_DATA", type = '[d')
|
||||
buf = to_array([0]*outp.get_size(),'d')
|
||||
buf[2] = 0
|
||||
buf[3]= -1
|
||||
|
||||
try:
|
||||
while True:
|
||||
started = (start.get()==1)
|
||||
if started and not st:
|
||||
print "Started"
|
||||
channel_list=get_channel_list()
|
||||
print "Channel list: ", channel_list
|
||||
st = start_stream(channel_list)
|
||||
|
||||
class StreamListener (DeviceListener):
|
||||
def onValueChanged(self, device, value, former):
|
||||
if handle_message(value, channel_list, buf):
|
||||
outp.putq(buf)
|
||||
listener = StreamListener()
|
||||
st.addListener(listener)
|
||||
|
||||
elif st and not started:
|
||||
print "Stopped"
|
||||
channel_list=[]
|
||||
stop_stream(st)
|
||||
st = None
|
||||
"""
|
||||
if st:
|
||||
st.waitCacheChange(0)
|
||||
if handle_message(st.take(), channel_list, buf):
|
||||
outp.putq(buf)
|
||||
time.sleep(0.001)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
"""
|
||||
time.sleep(0.1)
|
||||
finally:
|
||||
stop_stream(st)
|
||||
if outp: outp.close()
|
||||
if start: start.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
Reference in New Issue
Block a user