From b88346182e9831e0d345b02f6b4617894e2eaa75 Mon Sep 17 00:00:00 2001 From: appel_c Date: Wed, 9 Aug 2023 11:20:11 +0200 Subject: [PATCH] fix: streamer and dap process --- bec_plugins/data_processing/px_example.py | 32 +++++++++++++--------- bec_plugins/data_processing/px_streamer.py | 10 +++---- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/bec_plugins/data_processing/px_example.py b/bec_plugins/data_processing/px_example.py index e0ca034..8fb7a4f 100644 --- a/bec_plugins/data_processing/px_example.py +++ b/bec_plugins/data_processing/px_example.py @@ -23,6 +23,7 @@ class StreamProcessorPx(StreamProcessor): super().__init__(connector, config) self.metadata_consumer = None self.metadata = {} + self.num_received_msgs = 0 self.queue = Queue() self.start_metadata_consumer() @@ -47,31 +48,38 @@ class StreamProcessorPx(StreamProcessor): first_key = next(iter(self.metadata)) self.metadata.pop(first_key) proj_nr = int(topic.split("px_stream/projection_")[1].split("/")[0]) - self.metadata.update({proj_nr : msg.content['signals']}) - self.queue.put((proj_nr, msg.content['signals'])) + self.metadata.update({proj_nr: msg.content["signals"]}) + self.queue.put((proj_nr, msg.content["signals"])) def _run_forever(self): """""" - #TODO: Check if should skip entries in queue at beginning + # TODO: Check if should skip entries in queue at beginning proj_nr, metadata = self.queue.get() + self.num_received_msgs = 0 + data = [] while self.queue.empty(): + start = time.time() data_msgs = self._get_data(proj_nr) - data = [msg.content['signals']['data'] for msg in data_msgs if msg is not None] + print(f"Processing took {time.time() - start}") + data.extend([msg.content["signals"]["data"] for msg in data_msgs if msg is not None]) + # if len(data) > : result = self.process(data, metadata) if not result: continue - msg = BECMessage.ProcessedDataMessage(data=result[0], metadata=result[1]).dumps() + msg = BECMessage.ProcessedDataMessage(data=result[0][0], metadata=result[1]).dumps() + print("Publishing result") self._publish_result(msg) def _get_data(self, proj_nr: int) -> list: - msgs = self.producer.lrange(f'px_stream/projection_{proj_nr}/data', 0, -1) + msgs = self.producer.lrange( + f"px_stream/projection_{proj_nr}/data", self.num_received_msgs, -1 + ) if not msgs: return [] + self.num_received_msgs += len(msgs) return [BECMessage.DeviceMessage.loads(msg) for msg in msgs] - def process(self, data: list, metadata: dict) -> Optional[Tuple[dict, dict]]: - if not data: return None # get the event data, hard coded @@ -102,10 +110,8 @@ class StreamProcessorPx(StreamProcessor): out = f2phase stream_output = { - self.config["output"]: { - # 0: {"x": np.asarray(x), "y": np.asarray(y), "z": np.asarray(out)}, - 0: {"z": np.asarray(out)}, - }, + # 0: {"x": np.asarray(x), "y": np.asarray(y), "z": np.asarray(out)}, + 0: {"z": np.asarray(out)}, # "input": self.config["input_xy"], } metadata["grid_scan"] = out.shape @@ -201,7 +207,7 @@ if __name__ == "__main__": "channels": ["data", "metadata", "q", "norm_sum"], "output": "px_dap_worker", "parameters": { - "qranges" : [20,50], #TODO this will be signal from ROI selector + "qranges": [20, 50], # TODO this will be signal from ROI selector "contrast": 0, # "contrast_stream" : 'px_contrast_stream', }, } diff --git a/bec_plugins/data_processing/px_streamer.py b/bec_plugins/data_processing/px_streamer.py index 408a3c3..6bd84d8 100644 --- a/bec_plugins/data_processing/px_streamer.py +++ b/bec_plugins/data_processing/px_streamer.py @@ -13,9 +13,9 @@ def load_data() -> tuple: Returns """ proj_nr = 180 - basedir = f"/Users/janwyzula/PSI/test_data/projection_{proj_nr:06d}/" + basedir = f"/home/appel_c/Documents/test_data/projection_{proj_nr:06d}/" - metadata_name = f"/Users/janwyzula/PSI/test_data/projection_{proj_nr:06d}.json" + metadata_name = f"/home/appel_c/Documents/test_data/projection_{proj_nr:06d}.json" with open(metadata_name) as file: metadata = json.load(file) @@ -57,7 +57,7 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None: return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum} msg = BECMessage.DeviceMessage(signals=return_dict).dumps() bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe) - return_dict = {"proj_nr" : proj_nr} + return_dict = {"proj_nr": proj_nr} msg = BECMessage.DeviceMessage(signals=return_dict).dumps() bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe) pipe.execute() @@ -65,7 +65,7 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None: return_dict = {"data": data[line, ...]} msg = BECMessage.DeviceMessage(signals=return_dict).dumps() print(f"Sending line {line}") - bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg) + bec_producer.lpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg) print(f"Time to send {time.time()-start} seconds") print(f"Rate {data.shape[0]/(time.time()-start)} Hz") print(f"Data volume {data.nbytes/1e6} MB") @@ -77,6 +77,6 @@ if __name__ == "__main__": proj_nr = 180 while True: send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr) - time.sleep(20) + time.sleep(30) bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val") # proj_nr = proj_nr + 1