class CorrelationStream(): def __init__(self, prefix="SLAAR-LBSTREAM1", verbose=False, interval=1.0, empty_value = 1112223330): self.prefix = prefix self.verbose = verbose self.interval = interval self.empty_value = empty_value if self.prefix != ":": self.prefix = self.prefix + ":" self.t0=None self.pid=None self.st=None self.running = False self.task=None def get_channel_list(self): ch=[] try: for i in range(20): v= caget(self.prefix + "NAME" + str(i+1)) if v.strip()=="": break ch.append(str(v.strip())) except: pass return ch def ack_channel_list(self, channel_list): try: s="" for c in channel_list: s=s+c+"\n" s=s+"\x00" caput(self.prefix + "PACK_LIST",[ord(c) for c in s]) except: pass def start_stream(self, channel_list): try: self.st=None log("Starting stream: " + str(self.prefix)) if len(channel_list)==0: raise Exception ("No channel defined") self. st = Stream("pulse_id", dispatcher) self.st.setIncomplete("fill_null") for c in channel_list: self.st.addScalar(c, c, 1, 0) self.st.initialize() self.st.start() self.st.waitCacheChange(10000) #Wait stream be running before starting scan if self.st.take() is None: raise Exception("Error initializing data stream") self.ack_channel_list(channel_list) log("Stream started: " + str(self.prefix)) except: self.stop_stream() self.st=None log(sys.exc_info()[1]) def stop_stream(self): log("Closing stream: " + str(self.prefix)) try: if self.st: self.st.close() log("Stream closed: " + str(self.prefix)) except: log(sys.exc_info()[1]) finally: self.st=None self.ack_channel_list([]) def handle_message(self, msg, channel_list, buf): if self.t0 is None: self.t0 = time.time() now=time.time() num_channels = len(channel_list) size_buffer= 300 #int((len(buf)-100)/(len(channel_list)+1)) buf[0] = msg.pulseId buf[1] = num_channels buf[4] = self.empty_value #if (self.pid is not None) and ((self.pid+1) != msg.pulseId): # msg("Missing pid: +str(self.pid+1)) self.pid = msg.pulseId #Count if buf[2] < size_buffer: buf[2] = buf[2]+1 #Index buf[3] = buf[3]+1 if buf[3] == size_buffer: buf[3]=0 index = int(buf[3])*(num_channels+1) + 10 buf[index] = msg.pulseId index = index+1 for v in msg.values(): buf[index] = self.empty_value if ((v is None) or (math.isnan(v)))else v index = index+1 if now >= (self.t0 + self.interval): if self.verbose: print self.prefix, ":", to_list(buf[0:5]) self.t0 = time.time() return True return False def _run(self): log("Start running thread: " + str(self.prefix)) channel_list = [] try: start_channel = Channel(self.prefix + "START_STOP", type = 'i') output_channel = Channel(self.prefix + "PACK_DATA", type = '[d') buf = to_array([0]*output_channel.get_size(),'d') while self.running: started = (start_channel.get()==1) if started and not self.st: buf[2] = 0 buf[3]= -1 channel_list=self.get_channel_list() log("Started " + str(self.prefix) + " - Channel list: " + str(channel_list) ) self.start_stream(channel_list) if self.st is not None: class StreamListener (DeviceListener): def __init__(self, parent): self.parent=parent def onValueChanged(self, device, value, former): if self.parent.handle_message(value, channel_list, buf): output_channel.putq(buf) listener = StreamListener(self) self.st.addListener(listener) elif self.st and not started: log("Stopped " + str(self.prefix)) channel_list=[] self.stop_stream() time.sleep(0.1) except: log(sys.exc_info()[1]) finally: log("Stop running thread: " + str(self.prefix)) self.stop_stream() if output_channel: output_channel.close() if start_channel: start_channel.close() self.running = False def run(self): if self.running: return self.running = True self.task = fork(self._run)[0] def stop(self): if self.running: self.running = False if (self.task is not None): log("Waiting correlation stream thread to stop..." + str(self.prefix)) join([self.task,]) log("Done" + str(self.prefix)) def is_running(self): return (self.task is not None) and not (self.task.isDone()) if False: cs = CorrelationStream(prefix="SLAAR-LBSTREAM2") cs.run() try: while (cs.running): time.sleep(0.10) finally: cs.stop()