fix: streamer and dap process
This commit is contained in:
@@ -23,6 +23,7 @@ class StreamProcessorPx(StreamProcessor):
|
||||
super().__init__(connector, config)
|
||||
self.metadata_consumer = None
|
||||
self.metadata = {}
|
||||
self.num_received_msgs = 0
|
||||
self.queue = Queue()
|
||||
self.start_metadata_consumer()
|
||||
|
||||
@@ -47,31 +48,38 @@ class StreamProcessorPx(StreamProcessor):
|
||||
first_key = next(iter(self.metadata))
|
||||
self.metadata.pop(first_key)
|
||||
proj_nr = int(topic.split("px_stream/projection_")[1].split("/")[0])
|
||||
self.metadata.update({proj_nr : msg.content['signals']})
|
||||
self.queue.put((proj_nr, msg.content['signals']))
|
||||
self.metadata.update({proj_nr: msg.content["signals"]})
|
||||
self.queue.put((proj_nr, msg.content["signals"]))
|
||||
|
||||
def _run_forever(self):
|
||||
""""""
|
||||
#TODO: Check if should skip entries in queue at beginning
|
||||
# TODO: Check if should skip entries in queue at beginning
|
||||
proj_nr, metadata = self.queue.get()
|
||||
self.num_received_msgs = 0
|
||||
data = []
|
||||
while self.queue.empty():
|
||||
start = time.time()
|
||||
data_msgs = self._get_data(proj_nr)
|
||||
data = [msg.content['signals']['data'] for msg in data_msgs if msg is not None]
|
||||
print(f"Processing took {time.time() - start}")
|
||||
data.extend([msg.content["signals"]["data"] for msg in data_msgs if msg is not None])
|
||||
# if len(data) > :
|
||||
result = self.process(data, metadata)
|
||||
if not result:
|
||||
continue
|
||||
msg = BECMessage.ProcessedDataMessage(data=result[0], metadata=result[1]).dumps()
|
||||
msg = BECMessage.ProcessedDataMessage(data=result[0][0], metadata=result[1]).dumps()
|
||||
print("Publishing result")
|
||||
self._publish_result(msg)
|
||||
|
||||
def _get_data(self, proj_nr: int) -> list:
|
||||
msgs = self.producer.lrange(f'px_stream/projection_{proj_nr}/data', 0, -1)
|
||||
msgs = self.producer.lrange(
|
||||
f"px_stream/projection_{proj_nr}/data", self.num_received_msgs, -1
|
||||
)
|
||||
if not msgs:
|
||||
return []
|
||||
self.num_received_msgs += len(msgs)
|
||||
return [BECMessage.DeviceMessage.loads(msg) for msg in msgs]
|
||||
|
||||
|
||||
def process(self, data: list, metadata: dict) -> Optional[Tuple[dict, dict]]:
|
||||
|
||||
if not data:
|
||||
return None
|
||||
# get the event data, hard coded
|
||||
@@ -102,10 +110,8 @@ class StreamProcessorPx(StreamProcessor):
|
||||
out = f2phase
|
||||
|
||||
stream_output = {
|
||||
self.config["output"]: {
|
||||
# 0: {"x": np.asarray(x), "y": np.asarray(y), "z": np.asarray(out)},
|
||||
0: {"z": np.asarray(out)},
|
||||
},
|
||||
# 0: {"x": np.asarray(x), "y": np.asarray(y), "z": np.asarray(out)},
|
||||
0: {"z": np.asarray(out)},
|
||||
# "input": self.config["input_xy"],
|
||||
}
|
||||
metadata["grid_scan"] = out.shape
|
||||
@@ -201,7 +207,7 @@ if __name__ == "__main__":
|
||||
"channels": ["data", "metadata", "q", "norm_sum"],
|
||||
"output": "px_dap_worker",
|
||||
"parameters": {
|
||||
"qranges" : [20,50], #TODO this will be signal from ROI selector
|
||||
"qranges": [20, 50], # TODO this will be signal from ROI selector
|
||||
"contrast": 0, # "contrast_stream" : 'px_contrast_stream',
|
||||
},
|
||||
}
|
||||
|
||||
@@ -13,9 +13,9 @@ def load_data() -> tuple:
|
||||
Returns
|
||||
"""
|
||||
proj_nr = 180
|
||||
basedir = f"/Users/janwyzula/PSI/test_data/projection_{proj_nr:06d}/"
|
||||
basedir = f"/home/appel_c/Documents/test_data/projection_{proj_nr:06d}/"
|
||||
|
||||
metadata_name = f"/Users/janwyzula/PSI/test_data/projection_{proj_nr:06d}.json"
|
||||
metadata_name = f"/home/appel_c/Documents/test_data/projection_{proj_nr:06d}.json"
|
||||
with open(metadata_name) as file:
|
||||
metadata = json.load(file)
|
||||
|
||||
@@ -57,7 +57,7 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None:
|
||||
return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum}
|
||||
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
|
||||
bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe)
|
||||
return_dict = {"proj_nr" : proj_nr}
|
||||
return_dict = {"proj_nr": proj_nr}
|
||||
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
|
||||
bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe)
|
||||
pipe.execute()
|
||||
@@ -65,7 +65,7 @@ def send_data(data, q, norm_sum, bec_producer, metadata, proj_nr) -> None:
|
||||
return_dict = {"data": data[line, ...]}
|
||||
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
|
||||
print(f"Sending line {line}")
|
||||
bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg)
|
||||
bec_producer.lpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg)
|
||||
print(f"Time to send {time.time()-start} seconds")
|
||||
print(f"Rate {data.shape[0]/(time.time()-start)} Hz")
|
||||
print(f"Data volume {data.nbytes/1e6} MB")
|
||||
@@ -77,6 +77,6 @@ if __name__ == "__main__":
|
||||
proj_nr = 180
|
||||
while True:
|
||||
send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr)
|
||||
time.sleep(20)
|
||||
time.sleep(30)
|
||||
bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val")
|
||||
# proj_nr = proj_nr + 1
|
||||
|
||||
Reference in New Issue
Block a user