fix: bugfix of process

This commit is contained in:
2023-08-11 13:31:36 +02:00
parent be862ef526
commit 56f1f54646

View File

@@ -121,6 +121,7 @@ class SaxsImagingProcessor(StreamProcessor):
# TODO hardcoded endpoint, possibe to use more general solution?
msg = self.producer.get(topic=f"px_stream/projection_{self.proj_nr}/metadata")
msg_raw = BECMessage.DeviceMessage.loads(msg)
self._update_queue(msg_raw.content["signals"], self.proj_nr)
def _update_queue(self, metadata: dict, proj_nr: int) -> None:
"""Update the process queue.
@@ -191,12 +192,6 @@ class SaxsImagingProcessor(StreamProcessor):
self.proj_nr = int(topic.split("px_stream/projection_")[1].split("/")[0])
self._update_queue(msg.content["signals"], self.proj_nr)
def _init_data_output(self) -> None:
"""Initialize the data output.
Not yet used. Should be used to initialize the output for the processed data.
"""
self.data = None
def start_data_consumer(self) -> None:
"""function from the parent class that we don't want to use here"""
pass
@@ -214,17 +209,13 @@ class SaxsImagingProcessor(StreamProcessor):
proj_nr, metadata = self.queue.get()
self.num_received_msgs = 0
data = []
self.data = None
while self.queue.empty():
data_msgs = self._get_data(proj_nr)
if data_msgs is not None:
data.extend(
[msg.content["signals"]["data"] for msg in data_msgs if msg is not None]
)
# print(f"Loading took {time.time() - start}")
# start = time.time()
result = self.process(data, metadata)
# print(f"Processing took {time.time() - start}")
start = time.time()
self._get_data(proj_nr, metadata)
start = time.time()
result = self.process(self.data, metadata)
print(f"Processing took {time.time() - start}")
if result is None:
continue
print(f"Length of data is {result[0][0]['z'].shape}")
@@ -232,7 +223,7 @@ class SaxsImagingProcessor(StreamProcessor):
print("Publishing result")
self._publish_result(msg)
def _get_data(self, proj_nr: int) -> list:
def _get_data(self, proj_nr: int, metadata: dict) -> None:
"""Get data for given proj_nr from redis.
Args:
@@ -242,16 +233,35 @@ class SaxsImagingProcessor(StreamProcessor):
list: List of azimuthal integrated data.
"""
start = time.time()
msgs = self.producer.lrange(
f"px_stream/projection_{proj_nr}/data", self.num_received_msgs, -1
)
print(f"Loading of {len(msgs)} took {time.time() - start}")
if not msgs:
return None
self.num_received_msgs += len(msgs)
return [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
def process(self, data: list, metadata: dict) -> Optional[Tuple[dict, dict]]:
frame_shape = BECMessage.DeviceMessage.loads(msgs[0]).content["signals"]["data"].shape[-2:]
if self.data is None:
start = time.time()
self.data = np.empty(
(
metadata["metadata"]["number_of_rows"],
metadata["metadata"]["number_of_columns"],
*frame_shape,
)
)
print(f"Init output took {time.time() - start}")
start = time.time()
for msg in msgs:
self.data[
self.num_received_msgs : self.num_received_msgs + 1, ...
] = BECMessage.DeviceMessage.loads(msg).content["signals"]["data"]
self.num_received_msgs += 1
print(f"Casting data to array took {time.time() - start}")
def process(self, data: np.ndarray, metadata: dict) -> Optional[Tuple[dict, dict]]:
"""Process the scanning SAXS data
Args:
@@ -263,10 +273,10 @@ class SaxsImagingProcessor(StreamProcessor):
"""
if not data:
if data is None:
return None
# TODO np.asarray is repsonsible for 95% of the processing time for function.
azint_data = np.asarray(data)
azint_data = data[0 : self.num_received_msgs, ...]
norm_sum = metadata["norm_sum"]
q = metadata["q"]
out = []