From 56f1f54646389922c87e8e8ee1d79c08ec48087c Mon Sep 17 00:00:00 2001 From: appel_c Date: Fri, 11 Aug 2023 13:31:36 +0200 Subject: [PATCH] fix: bugfix of process --- .../data_processing/saxs_imaging_processor.py | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/bec_plugins/data_processing/saxs_imaging_processor.py b/bec_plugins/data_processing/saxs_imaging_processor.py index 806e438..abb660b 100644 --- a/bec_plugins/data_processing/saxs_imaging_processor.py +++ b/bec_plugins/data_processing/saxs_imaging_processor.py @@ -121,6 +121,7 @@ class SaxsImagingProcessor(StreamProcessor): # TODO hardcoded endpoint, possibe to use more general solution? msg = self.producer.get(topic=f"px_stream/projection_{self.proj_nr}/metadata") msg_raw = BECMessage.DeviceMessage.loads(msg) + self._update_queue(msg_raw.content["signals"], self.proj_nr) def _update_queue(self, metadata: dict, proj_nr: int) -> None: """Update the process queue. @@ -191,12 +192,6 @@ class SaxsImagingProcessor(StreamProcessor): self.proj_nr = int(topic.split("px_stream/projection_")[1].split("/")[0]) self._update_queue(msg.content["signals"], self.proj_nr) - def _init_data_output(self) -> None: - """Initialize the data output. - Not yet used. Should be used to initialize the output for the processed data. - """ - self.data = None - def start_data_consumer(self) -> None: """function from the parent class that we don't want to use here""" pass @@ -214,17 +209,13 @@ class SaxsImagingProcessor(StreamProcessor): proj_nr, metadata = self.queue.get() self.num_received_msgs = 0 - data = [] + self.data = None while self.queue.empty(): - data_msgs = self._get_data(proj_nr) - if data_msgs is not None: - data.extend( - [msg.content["signals"]["data"] for msg in data_msgs if msg is not None] - ) - # print(f"Loading took {time.time() - start}") - # start = time.time() - result = self.process(data, metadata) - # print(f"Processing took {time.time() - start}") + start = time.time() + self._get_data(proj_nr, metadata) + start = time.time() + result = self.process(self.data, metadata) + print(f"Processing took {time.time() - start}") if result is None: continue print(f"Length of data is {result[0][0]['z'].shape}") @@ -232,7 +223,7 @@ class SaxsImagingProcessor(StreamProcessor): print("Publishing result") self._publish_result(msg) - def _get_data(self, proj_nr: int) -> list: + def _get_data(self, proj_nr: int, metadata: dict) -> None: """Get data for given proj_nr from redis. Args: @@ -242,16 +233,35 @@ class SaxsImagingProcessor(StreamProcessor): list: List of azimuthal integrated data. """ - + start = time.time() msgs = self.producer.lrange( f"px_stream/projection_{proj_nr}/data", self.num_received_msgs, -1 ) + print(f"Loading of {len(msgs)} took {time.time() - start}") if not msgs: return None - self.num_received_msgs += len(msgs) - return [BECMessage.DeviceMessage.loads(msg) for msg in msgs] - def process(self, data: list, metadata: dict) -> Optional[Tuple[dict, dict]]: + frame_shape = BECMessage.DeviceMessage.loads(msgs[0]).content["signals"]["data"].shape[-2:] + + if self.data is None: + start = time.time() + self.data = np.empty( + ( + metadata["metadata"]["number_of_rows"], + metadata["metadata"]["number_of_columns"], + *frame_shape, + ) + ) + print(f"Init output took {time.time() - start}") + start = time.time() + for msg in msgs: + self.data[ + self.num_received_msgs : self.num_received_msgs + 1, ... + ] = BECMessage.DeviceMessage.loads(msg).content["signals"]["data"] + self.num_received_msgs += 1 + print(f"Casting data to array took {time.time() - start}") + + def process(self, data: np.ndarray, metadata: dict) -> Optional[Tuple[dict, dict]]: """Process the scanning SAXS data Args: @@ -263,10 +273,10 @@ class SaxsImagingProcessor(StreamProcessor): """ - if not data: + if data is None: return None # TODO np.asarray is repsonsible for 95% of the processing time for function. - azint_data = np.asarray(data) + azint_data = data[0 : self.num_received_msgs, ...] norm_sum = metadata["norm_sum"] q = metadata["q"] out = []