diff --git a/bec_plugins/data_processing/px_example.py b/bec_plugins/data_processing/px_example.py index 4f4dadb..e0ca034 100644 --- a/bec_plugins/data_processing/px_example.py +++ b/bec_plugins/data_processing/px_example.py @@ -104,6 +104,7 @@ class StreamProcessorPx(StreamProcessor): stream_output = { self.config["output"]: { # 0: {"x": np.asarray(x), "y": np.asarray(y), "z": np.asarray(out)}, + 0: {"z": np.asarray(out)}, }, # "input": self.config["input_xy"], } @@ -200,8 +201,7 @@ if __name__ == "__main__": "channels": ["data", "metadata", "q", "norm_sum"], "output": "px_dap_worker", "parameters": { - "qranges" : [20,50], - "roi_stream": "px_roi_stream", + "qranges" : [20,50], #TODO this will be signal from ROI selector "contrast": 0, # "contrast_stream" : 'px_contrast_stream', }, } diff --git a/bec_plugins/data_processing/px_streamer.py b/bec_plugins/data_processing/px_streamer.py index 08f8aa2..408a3c3 100644 --- a/bec_plugins/data_processing/px_streamer.py +++ b/bec_plugins/data_processing/px_streamer.py @@ -13,9 +13,9 @@ def load_data() -> tuple: Returns """ proj_nr = 180 - basedir = f"/das/work/units/pem/p19745/online_data/analysis/radial_integration_eiger/projection_{proj_nr:06d}/" + basedir = f"/Users/janwyzula/PSI/test_data/projection_{proj_nr:06d}/" - metadata_name = f"/das/work/units/pem/p19745/online_data/metadata/projection_{proj_nr:06d}.json" + metadata_name = f"/Users/janwyzula/PSI/test_data/projection_{proj_nr:06d}.json" with open(metadata_name) as file: metadata = json.load(file) @@ -57,7 +57,9 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None: return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum} msg = BECMessage.DeviceMessage(signals=return_dict).dumps() bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe) - + return_dict = {"proj_nr" : proj_nr} + msg = BECMessage.DeviceMessage(signals=return_dict).dumps() + bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe) pipe.execute() for line in range(data.shape[0]): return_dict = {"data": data[line, ...]} @@ -75,5 +77,6 @@ if __name__ == "__main__": proj_nr = 180 while True: send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr) - time.sleep(1) - proj_nr = proj_nr + 1 + time.sleep(20) + bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val") + # proj_nr = proj_nr + 1