174 lines
6.1 KiB
Python
174 lines
6.1 KiB
Python
|
|
class CorrelationStream():
|
|
def __init__(self, prefix="SLAAR-LBSTREAM1", verbose=False, interval=1.0, empty_value = 1112223330):
|
|
self.prefix = prefix
|
|
self.verbose = verbose
|
|
self.interval = interval
|
|
self.empty_value = empty_value
|
|
if self.prefix != ":":
|
|
self.prefix = self.prefix + ":"
|
|
|
|
self.t0=None
|
|
self.pid=None
|
|
|
|
self.st=None
|
|
self.running = False
|
|
self.task=None
|
|
|
|
|
|
def get_channel_list(self):
|
|
ch=[]
|
|
try:
|
|
for i in range(20):
|
|
v= caget(self.prefix + "NAME" + str(i+1))
|
|
if v.strip()=="":
|
|
break
|
|
ch.append(str(v.strip()))
|
|
except:
|
|
pass
|
|
return ch
|
|
|
|
def ack_channel_list(self, channel_list):
|
|
try:
|
|
s=""
|
|
for c in channel_list:
|
|
s=s+c+"\n"
|
|
s=s+"\x00"
|
|
caput(self.prefix + "PACK_LIST",[ord(c) for c in s])
|
|
except:
|
|
pass
|
|
|
|
def start_stream(self, channel_list):
|
|
try:
|
|
self.st=None
|
|
log("Starting stream: " + str(self.prefix))
|
|
if len(channel_list)==0:
|
|
raise Exception ("No channel defined")
|
|
self. st = Stream("pulse_id", dispatcher)
|
|
self.st.setIncomplete("fill_null")
|
|
for c in channel_list:
|
|
self.st.addScalar(c, c, 1, 0)
|
|
self.st.initialize()
|
|
self.st.start()
|
|
self.st.waitCacheChange(10000) #Wait stream be running before starting scan
|
|
if self.st.take() is None:
|
|
raise Exception("Error initializing data stream")
|
|
self.ack_channel_list(channel_list)
|
|
log("Stream started: " + str(self.prefix))
|
|
except:
|
|
self.stop_stream()
|
|
self.st=None
|
|
log(sys.exc_info()[1])
|
|
|
|
def stop_stream(self):
|
|
log("Closing stream: " + str(self.prefix))
|
|
try:
|
|
if self.st:
|
|
self.st.close()
|
|
log("Stream closed: " + str(self.prefix))
|
|
except:
|
|
log(sys.exc_info()[1])
|
|
finally:
|
|
self.st=None
|
|
self.ack_channel_list([])
|
|
|
|
def handle_message(self, msg, channel_list, buf):
|
|
if self.t0 is None:
|
|
self.t0 = time.time()
|
|
now=time.time()
|
|
num_channels = len(channel_list)
|
|
size_buffer= 300 #int((len(buf)-100)/(len(channel_list)+1))
|
|
buf[0] = msg.pulseId
|
|
buf[1] = num_channels
|
|
buf[4] = self.empty_value
|
|
#if (self.pid is not None) and ((self.pid+1) != msg.pulseId):
|
|
# msg("Missing pid: +str(self.pid+1))
|
|
self.pid = msg.pulseId
|
|
#Count
|
|
if buf[2] < size_buffer:
|
|
buf[2] = buf[2]+1
|
|
#Index
|
|
buf[3] = buf[3]+1
|
|
if buf[3] == size_buffer:
|
|
buf[3]=0
|
|
index = int(buf[3])*(num_channels+1) + 10
|
|
buf[index] = msg.pulseId
|
|
index = index+1
|
|
for v in msg.values():
|
|
buf[index] = self.empty_value if ((v is None) or (math.isnan(v)))else v
|
|
index = index+1
|
|
if now >= (self.t0 + self.interval):
|
|
if self.verbose:
|
|
print self.prefix, ":", to_list(buf[0:5])
|
|
self.t0 = time.time()
|
|
return True
|
|
return False
|
|
|
|
|
|
def _run(self):
|
|
log("Start running thread: " + str(self.prefix))
|
|
channel_list = []
|
|
|
|
try:
|
|
start_channel = Channel(self.prefix + "START_STOP", type = 'i')
|
|
output_channel = Channel(self.prefix + "PACK_DATA", type = '[d')
|
|
buf = to_array([0]*output_channel.get_size(),'d')
|
|
while self.running:
|
|
started = (start_channel.get()==1)
|
|
if started and not self.st:
|
|
buf[2] = 0
|
|
buf[3]= -1
|
|
channel_list=self.get_channel_list()
|
|
log("Started " + str(self.prefix) + " - Channel list: " + str(channel_list) )
|
|
self.start_stream(channel_list)
|
|
if self.st is not None:
|
|
class StreamListener (DeviceListener):
|
|
def __init__(self, parent):
|
|
self.parent=parent
|
|
def onValueChanged(self, device, value, former):
|
|
if self.parent.handle_message(value, channel_list, buf):
|
|
output_channel.putq(buf)
|
|
listener = StreamListener(self)
|
|
self.st.addListener(listener)
|
|
|
|
elif self.st and not started:
|
|
log("Stopped " + str(self.prefix))
|
|
channel_list=[]
|
|
self.stop_stream()
|
|
time.sleep(0.1)
|
|
except:
|
|
log(sys.exc_info()[1])
|
|
finally:
|
|
log("Stop running thread: " + str(self.prefix))
|
|
self.stop_stream()
|
|
if output_channel: output_channel.close()
|
|
if start_channel: start_channel.close()
|
|
self.running = False
|
|
|
|
def run(self):
|
|
if self.running:
|
|
return
|
|
self.running = True
|
|
self.task = fork(self._run)[0]
|
|
|
|
def stop(self):
|
|
if self.running:
|
|
self.running = False
|
|
if (self.task is not None):
|
|
log("Waiting correlation stream thread to stop..." + str(self.prefix))
|
|
join([self.task,])
|
|
log("Done" + str(self.prefix))
|
|
|
|
def is_running(self):
|
|
return (self.task is not None) and not (self.task.isDone())
|
|
|
|
|
|
if False:
|
|
cs = CorrelationStream(prefix="SLAAR-LBSTREAM2")
|
|
cs.run()
|
|
try:
|
|
while (cs.running):
|
|
time.sleep(0.10)
|
|
finally:
|
|
cs.stop()
|
|
|