fix: fixed logic in data subscription to redis
This commit is contained in:
@@ -60,13 +60,20 @@ class StreamProcessorPx(StreamProcessor):
|
||||
while self.queue.empty():
|
||||
start = time.time()
|
||||
data_msgs = self._get_data(proj_nr)
|
||||
print(f"Processing took {time.time() - start}")
|
||||
data.extend([msg.content["signals"]["data"] for msg in data_msgs if msg is not None])
|
||||
# if len(data) > :
|
||||
#if len(data) > 80:
|
||||
# out = np.asarray(data)
|
||||
# result = ({0: {"z": np.sum(out, axis=(-1, -2))}}, {1: {}})
|
||||
#else:
|
||||
# continue
|
||||
print(f"Loading took {time.time() - start}")
|
||||
result = self.process(data, metadata)
|
||||
print(f"Processing took {time.time() - start}")
|
||||
if not result:
|
||||
continue
|
||||
msg = BECMessage.ProcessedDataMessage(data=result[0][0], metadata=result[1]).dumps()
|
||||
print(f"Length of data is {result[0][0]['z'].shape}")
|
||||
msg = BECMessage.ProcessedDataMessage(
|
||||
data=result[0][0], metadata=result[1]).dumps()
|
||||
print("Publishing result")
|
||||
self._publish_result(msg)
|
||||
|
||||
@@ -86,6 +93,7 @@ class StreamProcessorPx(StreamProcessor):
|
||||
azint_data = np.asarray(data)
|
||||
norm_sum = metadata["norm_sum"]
|
||||
q = metadata["q"]
|
||||
out = []
|
||||
|
||||
#####################################
|
||||
# Pick contrast 0:f1amp, 1:f2amp, 2:f2phase
|
||||
|
||||
@@ -65,7 +65,7 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None:
|
||||
return_dict = {"data": data[line, ...]}
|
||||
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
|
||||
print(f"Sending line {line}")
|
||||
bec_producer.lpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg)
|
||||
bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg)
|
||||
print(f"Time to send {time.time()-start} seconds")
|
||||
print(f"Rate {data.shape[0]/(time.time()-start)} Hz")
|
||||
print(f"Data volume {data.nbytes/1e6} MB")
|
||||
@@ -77,6 +77,6 @@ if __name__ == "__main__":
|
||||
proj_nr = 180
|
||||
while True:
|
||||
send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr)
|
||||
time.sleep(30)
|
||||
time.sleep(50)
|
||||
bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val")
|
||||
# proj_nr = proj_nr + 1
|
||||
|
||||
Reference in New Issue
Block a user