Compare commits
16 Commits
aac524dd36
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f23cc8110 | |||
| a58015b278 | |||
| aef6933fab | |||
| a89a6aded6 | |||
| 4de0c8b7d7 | |||
| ee79ee98a0 | |||
| c58fbbb8fa | |||
| 5273e0cf46 | |||
| e514061173 | |||
| 8e1073cfd1 | |||
| 17a739263d | |||
| a69b795450 | |||
| 10701994ec | |||
| deb9253acf | |||
| cce141423d | |||
| 941ab51856 |
@@ -9,10 +9,14 @@ from zmqsocks import ZMQSocketsAccumulator, make_address
|
||||
OUTPUT_DIR = "/gpfs/photonics/swissfel/buffer/dap/data"
|
||||
|
||||
ENTRIES_TO_SKIP = [
|
||||
# send: ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part.
|
||||
"roi_intensities_proj_x",
|
||||
# recv: ValueError: cannot reshape array of size 4 into shape (2,)
|
||||
"roi_intensities_x"
|
||||
"custom_script",
|
||||
"detector_name",
|
||||
"gain_file",
|
||||
"htype",
|
||||
"pedestal_file",
|
||||
"pulse_id",
|
||||
"timestamp",
|
||||
"type"
|
||||
]
|
||||
|
||||
|
||||
@@ -67,6 +71,10 @@ def accumulate(accumulator_addr, bsread_host, bsread_port, bsread_window):
|
||||
if not sender:
|
||||
continue
|
||||
|
||||
enable_bsread = results.get("enable_bsread", False)
|
||||
if not enable_bsread:
|
||||
continue
|
||||
|
||||
timestamp = tuple(results["timestamp"])
|
||||
data = pack_bsread_data(results, detector, skip=ENTRIES_TO_SKIP)
|
||||
sorter.add(pulse_id, (timestamp, data))
|
||||
|
||||
@@ -33,7 +33,7 @@ def calc_radial_integration(results, data, pixel_mask):
|
||||
rp = rp / integral_silent_region
|
||||
results["radint_normalised"] = [silent_min, silent_max]
|
||||
|
||||
results["radint_I"] = rp[r_min:].tolist() #TODO: why not stop at r_max?
|
||||
results["radint_I"] = rp[r_min:] #TODO: why not stop at r_max?
|
||||
results["radint_q"] = [r_min, r_max]
|
||||
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ def calc_roi(results, data):
|
||||
roi_sum_norm = np.nanmean(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized
|
||||
|
||||
roi_indices_x = [ix1, ix2]
|
||||
roi_proj_x = np.nansum(data_roi, axis=0).tolist()
|
||||
roi_proj_x = np.nansum(data_roi, axis=0)
|
||||
|
||||
roi_intensities.append(roi_sum)
|
||||
roi_intensities_normalised.append(roi_sum_norm)
|
||||
|
||||
@@ -36,7 +36,7 @@ def calc_spi_analysis(results, data):
|
||||
hit = (photon_percentage > spi_threshold_hit_percentage)
|
||||
|
||||
results["number_of_spots"] = photon_percentage
|
||||
results["is_hit_frame"] = bool(hit) # json does not like numpy bool_ scalars
|
||||
results["is_hit_frame"] = hit
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -205,17 +205,16 @@ def _calc_streakfinder_analysis(results, snr, mask):
|
||||
streak_lengths = np.sqrt(
|
||||
np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2) +
|
||||
np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2)
|
||||
).tolist()
|
||||
)
|
||||
|
||||
streak_lines = streak_lines.T
|
||||
_, number_of_streaks = streak_lines.shape
|
||||
|
||||
list_result = streak_lines.tolist() # arr(4, n_lines); coord x0, y0, x1, y1
|
||||
bragg_counts = [streak.total_mass() for streak in detected_streaks]
|
||||
|
||||
results["number_of_streaks"] = number_of_streaks
|
||||
results["is_hit_frame"] = (number_of_streaks > min_hit_streaks)
|
||||
results["streaks"] = list_result
|
||||
results["streaks"] = streak_lines # arr(4, n_lines); coord x0, y0, x1, y1
|
||||
results["streak_lengths"] = streak_lengths
|
||||
results["bragg_counts"] = bragg_counts
|
||||
|
||||
|
||||
@@ -6,17 +6,16 @@ def calc_apply_threshold(results, data, value=None, copy=False):
|
||||
if not apply_threshold:
|
||||
return data
|
||||
|
||||
for k in ("threshold_min", "threshold_max"):
|
||||
if k not in results:
|
||||
return data
|
||||
threshold_min = results.get("threshold_min")
|
||||
threshold_max = results.get("threshold_max")
|
||||
|
||||
if threshold_min is None and threshold_max is None:
|
||||
return data
|
||||
|
||||
if value is None:
|
||||
threshold_value = results.get("threshold_value", "NaN")
|
||||
value = 0 if threshold_value == "0" else np.nan #TODO
|
||||
|
||||
threshold_min = float(results["threshold_min"])
|
||||
threshold_max = float(results["threshold_max"])
|
||||
|
||||
if copy:
|
||||
data = data.copy() # do the following in-place changes on a copy
|
||||
|
||||
@@ -28,12 +27,15 @@ def calc_apply_threshold(results, data, value=None, copy=False):
|
||||
|
||||
def threshold(data, vmin, vmax, replacement):
|
||||
"""
|
||||
threshold data in place by replacing values < vmin and values > vmax with replacement
|
||||
threshold data in place by replacing values < vmin and values >= vmax with replacement
|
||||
"""
|
||||
# if vmin > vmax, data will be overwritten entirely -- better to ensure vmin < vmax by switching them if needed
|
||||
vmin, vmax = sorted((vmin, vmax))
|
||||
data[data < vmin] = replacement
|
||||
data[data > vmax] = replacement
|
||||
if vmin is not None and vmax is not None:
|
||||
# if vmin > vmax, data will be overwritten entirely -- better to ensure vmin < vmax by switching them if needed
|
||||
vmin, vmax = sorted((vmin, vmax))
|
||||
if vmin is not None:
|
||||
data[data < vmin] = replacement
|
||||
if vmax is not None:
|
||||
data[data >= vmax] = replacement
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from .bsreadext import make_bsread_sender, pack_bsread_data
|
||||
from .bits import read_bit
|
||||
from .bufjson import BufferedJSON
|
||||
from .filehandler import FileHandler
|
||||
from .jsonext import ExtendedJSONEncoder
|
||||
from .randskip import randskip
|
||||
from .sorter import Sorter
|
||||
from .timestamp import make_bsread_timestamp
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from bsread.sender import Sender, PUB
|
||||
import numpy as np
|
||||
from bsread import Sender, PUB
|
||||
|
||||
|
||||
def make_bsread_sender(host="*", port=None):
|
||||
@@ -17,11 +18,22 @@ def pack_bsread_data(orig, prefix, skip=None):
|
||||
if k in skip:
|
||||
continue
|
||||
if isinstance(v, bool):
|
||||
# bsread expects bools as ints
|
||||
v = int(v)
|
||||
elif isinstance(v, list) and not v:
|
||||
v = None
|
||||
elif isinstance(v, list):
|
||||
# bsread fails for empty lists and non-1D lists
|
||||
v = list_to_array(v)
|
||||
data[f"{prefix}:{k}"] = v
|
||||
return data
|
||||
|
||||
|
||||
def list_to_array(x):
|
||||
try:
|
||||
# let numpy figure out the dtype
|
||||
return np.array(x)
|
||||
except ValueError:
|
||||
# the above fails for ragged lists but bsread also cannot handle object arrays
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
22
dap/utils/jsonext.py
Normal file
22
dap/utils/jsonext.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import json
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ExtendedJSONEncoder(json.JSONEncoder):
|
||||
|
||||
def default(self, obj):
|
||||
if isinstance(obj, np.ndarray):
|
||||
return obj.tolist()
|
||||
elif isinstance(obj, np.generic): # covers all numpy scalars
|
||||
return obj.item()
|
||||
elif isinstance(obj, complex): # covers builtin complex
|
||||
return {"real": obj.real, "imag": obj.imag}
|
||||
elif isinstance(obj, Path):
|
||||
return str(obj)
|
||||
elif isinstance(obj, set):
|
||||
return sorted(obj)
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
|
||||
@@ -110,8 +110,8 @@ def work(backend_addr, accumulator_addr, visualisation_addr, fn_config, skip_fra
|
||||
if pixel_mask is not None:
|
||||
saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels)
|
||||
results["saturated_pixels"] = len(saturated_pixels_x)
|
||||
results["saturated_pixels_x"] = saturated_pixels_x.tolist()
|
||||
results["saturated_pixels_y"] = saturated_pixels_y.tolist()
|
||||
results["saturated_pixels_x"] = saturated_pixels_x
|
||||
results["saturated_pixels_y"] = saturated_pixels_y
|
||||
|
||||
|
||||
calc_radial_integration(results, image, pixel_mask)
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import numpy as np
|
||||
import zmq
|
||||
|
||||
from utils import ExtendedJSONEncoder
|
||||
|
||||
|
||||
FLAGS = 0
|
||||
|
||||
@@ -61,10 +63,10 @@ class ZMQSocketsWorker:
|
||||
|
||||
|
||||
def send_accumulator(self, results):
|
||||
self.accumulator_socket.send_json(results, FLAGS)
|
||||
self.accumulator_socket.send_json(results, FLAGS, cls=ExtendedJSONEncoder)
|
||||
|
||||
def send_visualisation(self, results, data):
|
||||
self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
|
||||
self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE, cls=ExtendedJSONEncoder)
|
||||
self.visualisation_socket.send(data, FLAGS, copy=True, track=True)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user