refactor: renamed and refactor streamer simulation

This commit is contained in:
2023-08-10 15:55:54 +02:00
parent 25dcabbb7c
commit d4b9a12ac9
3 changed files with 156 additions and 84 deletions
@@ -0,0 +1,155 @@
import os
import h5py
import numpy as np
import time
import json
import argparse
from bec_lib.core import RedisConnector, BECMessage
def load_data(datadir: str, metadata_path: str) -> tuple:
"""Load data from disk
Args:
datapath (str): Path to the data directory with data for projection (h5 files)
metadata_path (str): Path to the metadata file
Returns:
tuple: data, q, norm_sum, metadata
"""
with open(metadata_path) as file:
metadata = json.load(file)
filenames = [fname for fname in os.listdir(datadir) if fname.endswith(".h5")]
filenames.sort()
for ii, fname in enumerate(filenames):
with h5py.File(os.path.join(datadir, fname), "r") as h5file:
if ii == 0:
q = h5file["q"][...].T.squeeze()
norm_sum = h5file["norm_sum"][...]
data = np.zeros((len(filenames), *h5file["I_all"][...].shape))
data[ii, ...] = h5file["I_all"][...]
return data, q, norm_sum, metadata
def _get_projection_keys(producer) -> list:
"""Get all keys for projections with endpoint px_stream/projection_* in redis
Args:
producer (RedisProducer): Redis producer
Returns:
list: List of keys or [] if no keys are found"""
keys = producer.keys("px_stream/projection_*")
if not keys:
return []
return keys
def send_data(
data: np.ndarray,
q: np.ndarray,
norm_sum: np.ndarray,
bec_producer: RedisConnector.producer,
metadata: dict,
proj_nr: int,
) -> None:
"""Send data to redis and delete old data > 5 projections
Args:
data (np.ndarray): Data to send
q (np.ndarray): q values
norm_sum (np.ndarray): Normalization sum
bec_producer (RedisProducer): Redis producer
metadata (dict): Metadata
proj_nr (int): Projection number
Returns:
None
"""
start = time.time()
keys = _get_projection_keys(bec_producer)
pipe = bec_producer.pipeline()
proj_numbers = set(key.decode().split("px_stream/projection_")[1].split("/")[0] for key in keys)
if len(proj_numbers) > 5:
for entry in sorted(proj_numbers)[0:-5]:
for key in bec_producer.keys(f"px_stream/projection_{entry}/*"):
bec_producer.delete(topic=key, pipe=pipe)
print(f"Deleting {key}")
return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe)
return_dict = {"proj_nr": proj_nr}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe)
pipe.execute()
for line in range(data.shape[0]):
return_dict = {"data": data[line, ...]}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
print(f"Sending line {line}")
bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg)
print(f"Time to send {time.time()-start} seconds")
print(f"Rate {data.shape[0]/(time.time()-start)} Hz")
print(f"Data volume {data.nbytes/1e6} MB")
if __name__ == "__main__":
"""Start the stream simulator, defaults to px_stream/projection_* in redis on localhost:6379
Example usage:
>>> python saxs_imaging_streamsimulator.py -d ~/datadir/ -m ~/metadatafile.json -p 180 -d 30 -r localhost:6379
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-d",
"--datadir",
type=str,
help="filepath to datadir for projection files (in h5 format)",
required=True,
)
parser.add_argument(
"-m",
"--metadata",
type=str,
help="filepath to metadata json file",
required=True,
)
parser.add_argument(
"-p",
"--proj_nr",
type=int,
help="Projection number matching the data",
required=True,
)
parser.add_argument(
"-w",
"--wait_delay",
type=int,
help="delay between sending data in seconds (int)",
default=30,
)
parser.add_argument(
"-r",
"--redis",
type=str,
help="Redis_host:port",
default="localhost:6379",
)
values = parser.parse_args()
data, q, norm_sum, metadata = load_data(datadir=values.datadir, metadata_path=values.metadata)
bec_producer = RedisConnector([f"{values.redis}"]).producer()
proj_nr = values.proj_nr
delay = values.wait_delay
while True:
send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr)
time.sleep(delay)
bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val")