From 37705f76b00c2225ece89b0b759e3adcf6206daf Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 9 Aug 2023 21:40:21 +0200 Subject: [PATCH] fix: fixed logic in data subscription to redis --- bec_plugins/data_processing/px_example.py | 14 +++++++++++--- bec_plugins/data_processing/px_streamer.py | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/bec_plugins/data_processing/px_example.py b/bec_plugins/data_processing/px_example.py index 8fb7a4f..c42876f 100644 --- a/bec_plugins/data_processing/px_example.py +++ b/bec_plugins/data_processing/px_example.py @@ -60,13 +60,20 @@ class StreamProcessorPx(StreamProcessor): while self.queue.empty(): start = time.time() data_msgs = self._get_data(proj_nr) - print(f"Processing took {time.time() - start}") data.extend([msg.content["signals"]["data"] for msg in data_msgs if msg is not None]) - # if len(data) > : + #if len(data) > 80: + # out = np.asarray(data) + # result = ({0: {"z": np.sum(out, axis=(-1, -2))}}, {1: {}}) + #else: + # continue + print(f"Loading took {time.time() - start}") result = self.process(data, metadata) + print(f"Processing took {time.time() - start}") if not result: continue - msg = BECMessage.ProcessedDataMessage(data=result[0][0], metadata=result[1]).dumps() + print(f"Length of data is {result[0][0]['z'].shape}") + msg = BECMessage.ProcessedDataMessage( + data=result[0][0], metadata=result[1]).dumps() print("Publishing result") self._publish_result(msg) @@ -86,6 +93,7 @@ class StreamProcessorPx(StreamProcessor): azint_data = np.asarray(data) norm_sum = metadata["norm_sum"] q = metadata["q"] + out = [] ##################################### # Pick contrast 0:f1amp, 1:f2amp, 2:f2phase diff --git a/bec_plugins/data_processing/px_streamer.py b/bec_plugins/data_processing/px_streamer.py index 6bd84d8..825d2ae 100644 --- a/bec_plugins/data_processing/px_streamer.py +++ b/bec_plugins/data_processing/px_streamer.py @@ -65,7 +65,7 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None: return_dict = {"data": data[line, ...]} msg = BECMessage.DeviceMessage(signals=return_dict).dumps() print(f"Sending line {line}") - bec_producer.lpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg) + bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg) print(f"Time to send {time.time()-start} seconds") print(f"Rate {data.shape[0]/(time.time()-start)} Hz") print(f"Data volume {data.nbytes/1e6} MB") @@ -77,6 +77,6 @@ if __name__ == "__main__": proj_nr = 180 while True: send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr) - time.sleep(30) + time.sleep(50) bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val") # proj_nr = proj_nr + 1