diff --git a/bec_plugins/data_processing/px_streamer.py b/bec_plugins/data_processing/px_streamer.py deleted file mode 100644 index 97e89aa..0000000 --- a/bec_plugins/data_processing/px_streamer.py +++ /dev/null @@ -1,82 +0,0 @@ -import os -import h5py -import numpy as np -import time -import json - -from bec_lib.core import RedisConnector, BECMessage - - -def load_data() -> tuple: - """ - - Returns - """ - proj_nr = 180 - basedir = f"/Users/janwyzula/PSI/test_data_reduced/projection_{proj_nr:06d}/" - - metadata_name = f"/Users/janwyzula/PSI/test_data_reduced/projection_{proj_nr:06d}.json" - with open(metadata_name) as file: - metadata = json.load(file) - - filenames = [fname for fname in os.listdir(basedir) if fname.endswith(".h5")] - filenames.sort() - - for ii, fname in enumerate(filenames): - with h5py.File(os.path.join(basedir, fname), "r") as h5file: - if ii == 0: - q = h5file["q"][...].T.squeeze() - norm_sum = h5file["norm_sum"][...] - data = np.zeros((len(filenames), *h5file["I_all"][...].shape)) - data[ii, ...] = h5file["I_all"][...] - - return data, q, norm_sum, metadata - - -def _get_projection_keys(producer): - keys = producer.keys("px_stream/projection_*") - if not keys: - return [] - return keys - - -def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None: - """""" - start = time.time() - - keys = _get_projection_keys(bec_producer) - pipe = bec_producer.pipeline() - proj_numbers = set(key.decode().split("px_stream/projection_")[1].split("/")[0] for key in keys) - if len(proj_numbers) > 5: - for entry in sorted(proj_numbers)[0:-5]: - for key in bec_producer.keys(f"px_stream/projection_{entry}/*"): - bec_producer.delete(topic=key, pipe=pipe) - print(f"Deleting {key}") - - # Add new data - return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum} - msg = BECMessage.DeviceMessage(signals=return_dict).dumps() - bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe) - return_dict = {"proj_nr": proj_nr} - msg = BECMessage.DeviceMessage(signals=return_dict).dumps() - bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe) - pipe.execute() - for line in range(data.shape[0]): - return_dict = {"data": data[line, ...]} - msg = BECMessage.DeviceMessage(signals=return_dict).dumps() - print(f"Sending line {line}") - bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg) - print(f"Time to send {time.time()-start} seconds") - print(f"Rate {data.shape[0]/(time.time()-start)} Hz") - print(f"Data volume {data.nbytes/1e6} MB") - - -if __name__ == "__main__": - data, q, norm_sum, metadata = load_data() - bec_producer = RedisConnector(["localhost:6379"]).producer() - proj_nr = 180 - while True: - send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr) - time.sleep(30) - bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val") - # proj_nr = proj_nr + 1 diff --git a/bec_plugins/data_processing/saxs_imaging_processor.py b/bec_plugins/data_processing/saxs_imaging_processor.py index 18fcde1..3d544cf 100644 --- a/bec_plugins/data_processing/saxs_imaging_processor.py +++ b/bec_plugins/data_processing/saxs_imaging_processor.py @@ -187,9 +187,8 @@ class SaxsImagingProcessor(StreamProcessor): if not data: return None - start = time.time() + # TODO np.asarray is repsonsible for 95% of the processing time for function. azint_data = np.asarray(data) - print(f"Processing took {time.time() - start}") norm_sum = metadata["norm_sum"] q = metadata["q"] out = [] diff --git a/bec_plugins/data_processing/utils/saxs_imaging_streamsimulator.py b/bec_plugins/data_processing/utils/saxs_imaging_streamsimulator.py new file mode 100644 index 0000000..0c9d3b2 --- /dev/null +++ b/bec_plugins/data_processing/utils/saxs_imaging_streamsimulator.py @@ -0,0 +1,155 @@ +import os +import h5py +import numpy as np +import time +import json +import argparse + +from bec_lib.core import RedisConnector, BECMessage + + +def load_data(datadir: str, metadata_path: str) -> tuple: + """Load data from disk + + Args: + datapath (str): Path to the data directory with data for projection (h5 files) + metadata_path (str): Path to the metadata file + + Returns: + tuple: data, q, norm_sum, metadata + + """ + + with open(metadata_path) as file: + metadata = json.load(file) + + filenames = [fname for fname in os.listdir(datadir) if fname.endswith(".h5")] + filenames.sort() + + for ii, fname in enumerate(filenames): + with h5py.File(os.path.join(datadir, fname), "r") as h5file: + if ii == 0: + q = h5file["q"][...].T.squeeze() + norm_sum = h5file["norm_sum"][...] + data = np.zeros((len(filenames), *h5file["I_all"][...].shape)) + data[ii, ...] = h5file["I_all"][...] + + return data, q, norm_sum, metadata + + +def _get_projection_keys(producer) -> list: + """Get all keys for projections with endpoint px_stream/projection_* in redis + + Args: + producer (RedisProducer): Redis producer + + Returns: + list: List of keys or [] if no keys are found""" + keys = producer.keys("px_stream/projection_*") + if not keys: + return [] + return keys + + +def send_data( + data: np.ndarray, + q: np.ndarray, + norm_sum: np.ndarray, + bec_producer: RedisConnector.producer, + metadata: dict, + proj_nr: int, +) -> None: + """Send data to redis and delete old data > 5 projections + + Args: + data (np.ndarray): Data to send + q (np.ndarray): q values + norm_sum (np.ndarray): Normalization sum + bec_producer (RedisProducer): Redis producer + metadata (dict): Metadata + proj_nr (int): Projection number + + Returns: + None + + """ + start = time.time() + + keys = _get_projection_keys(bec_producer) + pipe = bec_producer.pipeline() + proj_numbers = set(key.decode().split("px_stream/projection_")[1].split("/")[0] for key in keys) + if len(proj_numbers) > 5: + for entry in sorted(proj_numbers)[0:-5]: + for key in bec_producer.keys(f"px_stream/projection_{entry}/*"): + bec_producer.delete(topic=key, pipe=pipe) + print(f"Deleting {key}") + + return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum} + msg = BECMessage.DeviceMessage(signals=return_dict).dumps() + bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe) + return_dict = {"proj_nr": proj_nr} + msg = BECMessage.DeviceMessage(signals=return_dict).dumps() + bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe) + pipe.execute() + + for line in range(data.shape[0]): + return_dict = {"data": data[line, ...]} + msg = BECMessage.DeviceMessage(signals=return_dict).dumps() + print(f"Sending line {line}") + bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg) + print(f"Time to send {time.time()-start} seconds") + print(f"Rate {data.shape[0]/(time.time()-start)} Hz") + print(f"Data volume {data.nbytes/1e6} MB") + + +if __name__ == "__main__": + """Start the stream simulator, defaults to px_stream/projection_* in redis on localhost:6379 + + Example usage: + >>> python saxs_imaging_streamsimulator.py -d ~/datadir/ -m ~/metadatafile.json -p 180 -d 30 -r localhost:6379 + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "-d", + "--datadir", + type=str, + help="filepath to datadir for projection files (in h5 format)", + required=True, + ) + parser.add_argument( + "-m", + "--metadata", + type=str, + help="filepath to metadata json file", + required=True, + ) + parser.add_argument( + "-p", + "--proj_nr", + type=int, + help="Projection number matching the data", + required=True, + ) + parser.add_argument( + "-w", + "--wait_delay", + type=int, + help="delay between sending data in seconds (int)", + default=30, + ) + parser.add_argument( + "-r", + "--redis", + type=str, + help="Redis_host:port", + default="localhost:6379", + ) + values = parser.parse_args() + data, q, norm_sum, metadata = load_data(datadir=values.datadir, metadata_path=values.metadata) + bec_producer = RedisConnector([f"{values.redis}"]).producer() + proj_nr = values.proj_nr + delay = values.wait_delay + while True: + send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr) + time.sleep(delay) + bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val")