""" Run as: pshell_be -l -test -c -f="users/edwin/correlation_stream.py" -args="PREFIX:'SLAAR-LBSTREAM1'" or with additional parameters: pshell_be -l -test -c -f="users/edwin/correlation_stream.py" -args="PREFIX:'SLAAR-LBSTREAM1',VERBOSE:False,TIME_INTERVAL:2.0" """ if not "PREFIX" in globals(): PREFIX = "SLAAR-LBSTREAM1" if not "VERBOSE" in globals(): VERBOSE = True if not "TIME_INTERVAL" in globals(): TIME_INTERVAL = 1.0 EMPTY_VALUE = 1112223330 if PREFIX[-1] != ":": PREFIX = PREFIX + ":" def get_channel_list(): ch=[] try: for i in range(20): v= caget(PREFIX + "NAME" + str(i+1)) if v.strip()=="": break ch.append(str(v.strip())) except: pass return ch def ack_channel_list(channel_list): try: s="" for c in channel_list: s=s+c+"\n" s=s+"\x00" caput(PREFIX + "PACK_LIST",[ord(c) for c in s]) except: pass def start_stream(channel_list): try: st=None print "Starting stream" if len(channel_list)==0: raise Exception ("No channel defined") st = Stream("pulse_id", dispatcher) for c in channel_list: st.addScalar(c, c, 1, 0) st.initialize() st.start() st.waitCacheChange(10000) #Wait stream be running before starting scan if st.take() is None: raise Exception("Error initializing data stream") ack_channel_list(channel_list) print "Stream started" except: stop_stream(st) print sys.exc_info()[1] return st def stop_stream(st): try: if st: st.close() st=None print "Stream closed" except: print sys.exc_info()[1] finally: ack_channel_list([]) #def on_start_change(val): # fork(start_stream if (val==1) else stop_stream) #start = Channel(PREFIX + "START_STOP", type = 'i', monitored = True, callback=on_start_change) t0=None pid=None def handle_message(msg, channel_list, buf): global t0, pid if t0 is None: t0 = time.time() now=time.time() num_channels = len(channel_list) size_buffer= 300 #int((len(buf)-100)/(len(channel_list)+1)) buf[0] = msg.pulseId buf[1] = num_channels buf[4] = EMPTY_VALUE if (pid is not None) and ((pid+1) != msg.pulseId): print "Missing pid: ", (pid+1) pid = msg.pulseId #Count if buf[2] < size_buffer: buf[2] = buf[2]+1 #Index buf[3] = buf[3]+1 if buf[3] == size_buffer: buf[3]=0 index = int(buf[3])*(num_channels+1) + 10 buf[index] = msg.pulseId index = index+1 for v in msg.values(): buf[index] = EMPTY_VALUE if ((v is None) or (math.isnan(v)))else v index = index+1 if now >= (t0 + TIME_INTERVAL): if VERBOSE: print to_list(buf[0:5]) t0 = time.time() return True return False def run(): channel_list = [] st=None start = Channel(PREFIX + "START_STOP", type = 'i') outp = Channel(PREFIX + "PACK_DATA", type = '[d') buf = to_array([0]*outp.get_size(),'d') buf[2] = 0 buf[3]= -1 try: while True: started = (start.get()==1) if started and not st: print "Started" channel_list=get_channel_list() print "Channel list: ", channel_list st = start_stream(channel_list) class StreamListener (DeviceListener): def onValueChanged(self, device, value, former): if handle_message(value, channel_list, buf): outp.putq(buf) listener = StreamListener() st.addListener(listener) elif st and not started: print "Stopped" channel_list=[] stop_stream(st) st = None """ if st: st.waitCacheChange(0) if handle_message(st.take(), channel_list, buf): outp.putq(buf) time.sleep(0.001) else: time.sleep(0.1) """ time.sleep(0.1) finally: stop_stream(st) if outp: outp.close() if start: start.close() if __name__ == "__main__": run()