Compare commits

..

23 Commits

Author SHA1 Message Date
8f23cc8110 added switch to enable/disable bsread sender 2026-02-06 12:10:59 +01:00
a58015b278 peak_list_* is not a numpy array 2026-02-05 18:36:36 +01:00
aef6933fab adjust to new import logic 2025-11-24 19:19:08 +01:00
a89a6aded6 handle equals case correctly (i.e., like indices) 2025-11-19 18:53:40 +01:00
4de0c8b7d7 allow vmin/vmax = None for "no limit" 2025-11-19 18:53:33 +01:00
ee79ee98a0 actually do send Nones 2025-11-04 11:32:15 +01:00
c58fbbb8fa added another comment 2025-11-04 11:01:22 +01:00
5273e0cf46 do not send Nones 2025-11-04 10:37:12 +01:00
e514061173 added type 2025-11-04 10:34:17 +01:00
8e1073cfd1 removed skipping erroring cases; added skipping constant entries 2025-11-04 10:24:49 +01:00
17a739263d bsread cannot handle object arrays 2025-11-03 22:10:45 +01:00
a69b795450 convert lists to numpy arrays (in order to handle non-1D lists) 2025-11-03 21:31:09 +01:00
10701994ec removed all manual array.tolist() conversions (and similar) that were needed for json serializability 2025-11-03 19:45:37 +01:00
deb9253acf this is not a package 2025-11-03 16:57:09 +01:00
cce141423d use ExtendedJSONEncoder 2025-11-03 16:39:47 +01:00
941ab51856 added ExtendedJSONEncoder 2025-11-03 16:39:40 +01:00
aac524dd36 cooldown of a minute is enough; cache outside cooldown (once cached, no cooldown check needed anymore) 2025-11-03 09:41:13 +01:00
6a441e8c63 added cooldown to avoid checking for a non-existent file at 100 Hz 2025-11-02 16:50:16 +01:00
46fca4a6b5 added custom script 2025-11-01 20:10:23 +01:00
28f7248500 de-duplicated calc_apply_threshold 2025-10-31 18:38:58 +01:00
603f9215e8 added replacement value as argument 2025-10-31 18:35:11 +01:00
36c61c7b8e added boolean copy switch 2025-10-31 18:32:07 +01:00
32adb370cd order 2025-10-31 18:20:51 +01:00
16 changed files with 243 additions and 76 deletions

View File

@@ -9,10 +9,14 @@ from zmqsocks import ZMQSocketsAccumulator, make_address
OUTPUT_DIR = "/gpfs/photonics/swissfel/buffer/dap/data"
ENTRIES_TO_SKIP = [
# send: ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part.
"roi_intensities_proj_x",
# recv: ValueError: cannot reshape array of size 4 into shape (2,)
"roi_intensities_x"
"custom_script",
"detector_name",
"gain_file",
"htype",
"pedestal_file",
"pulse_id",
"timestamp",
"type"
]
@@ -67,6 +71,10 @@ def accumulate(accumulator_addr, bsread_host, bsread_port, bsread_window):
if not sender:
continue
enable_bsread = results.get("enable_bsread", False)
if not enable_bsread:
continue
timestamp = tuple(results["timestamp"])
data = pack_bsread_data(results, detector, skip=ENTRIES_TO_SKIP)
sorter.add(pulse_id, (timestamp, data))

View File

@@ -2,6 +2,7 @@
from .addmask import calc_apply_additional_mask
from .addmaskfile import calc_apply_additional_mask_from_file
from .aggregation import calc_apply_aggregation
from .custom import calc_custom
from .jfdata import JFData
from .mask import calc_mask_pixels
from .peakfind import calc_peakfinder_analysis

View File

@@ -1,5 +1,5 @@
from .mask import calc_mask_pixels
from .thresh import threshold
from .thresh import calc_apply_threshold
def calc_apply_aggregation(results, data, pixel_mask, aggregator):
@@ -7,7 +7,7 @@ def calc_apply_aggregation(results, data, pixel_mask, aggregator):
if aggregator.is_ready():
aggregator.reset()
calc_apply_threshold(results, data) # changes data in place
calc_apply_threshold(results, data, value=0) # changes data in place
data = calc_aggregate(results, data, aggregator)
calc_mask_pixels(data, pixel_mask) # changes data in place
@@ -18,23 +18,6 @@ def calc_apply_aggregation(results, data, pixel_mask, aggregator):
#TODO: this is duplicated in calc_apply_threshold and calc_radial_integration
def calc_apply_threshold(results, data):
apply_threshold = results.get("apply_threshold", False)
if not apply_threshold:
return
for k in ("threshold_min", "threshold_max"):
if k not in results:
return
threshold_min = float(results["threshold_min"])
threshold_max = float(results["threshold_max"])
threshold(data, threshold_min, threshold_max, 0)
def calc_aggregate(results, data, aggregator):
apply_aggregation = results.get("apply_aggregation", False)
if not apply_aggregation:

67
dap/algos/custom.py Normal file
View File

@@ -0,0 +1,67 @@
import functools
import importlib.util as ilu
from pathlib import Path
from .utils import cooldown
BASE = "/gpfs/photonics/swissfel/buffer/dap/custom_dap_scripts"
BASE = Path(BASE)
def calc_custom(results, image, pixel_mask):
script = results.get("custom_script")
if not script:
return
try:
func = load_custom(script)
res = func(results, image, pixel_mask)
except Exception as e:
en = type(e).__name__
error = f"{en}: {e}"
return {"custom_script_error": error}
if isinstance(res, dict):
res = {f"{script}:{k}": v for k, v in res.items()}
else:
res = {script: res}
results.update(res)
@functools.cache
@cooldown(60)
def load_custom(script):
beamline, name = script.split(":")
ext = ".py"
if not name.endswith(ext):
name += ext
fn = BASE / beamline / name
return load_proc(fn)
def load_proc(fn):
mod = load_module(fn)
proc_func_name = "proc"
mod_name = mod.__name__
func = getattr(mod, proc_func_name, None) or getattr(mod, mod_name, None)
if func is None:
raise AttributeError(f'module "{mod_name}" contains neither "{proc_func_name}" nor "{mod_name}" function')
return func
def load_module(file_path, module_name=None):
module_name = module_name or Path(file_path).stem
spec = ilu.spec_from_file_location(module_name, file_path)
module = ilu.module_from_spec(spec)
spec.loader.exec_module(module)
return module

View File

@@ -1,6 +1,6 @@
import numpy as np
from .thresh import threshold
from .thresh import calc_apply_threshold
from .utils import npmemo
@@ -17,7 +17,7 @@ def calc_radial_integration(results, data, pixel_mask):
r_min = min(rad)
r_max = max(rad) + 1
data = calc_apply_threshold(results, data)
data = calc_apply_threshold(results, data, value=np.nan, copy=True)
rp = radial_profile(data, rad, norm, pixel_mask)
@@ -33,7 +33,7 @@ def calc_radial_integration(results, data, pixel_mask):
rp = rp / integral_silent_region
results["radint_normalised"] = [silent_min, silent_max]
results["radint_I"] = rp[r_min:].tolist() #TODO: why not stop at r_max?
results["radint_I"] = rp[r_min:] #TODO: why not stop at r_max?
results["radint_q"] = [r_min, r_max]
@@ -59,24 +59,3 @@ def radial_profile(data, rad, norm, keep_pixels):
#TODO: this is duplicated in calc_apply_threshold and calc_apply_aggregation
def calc_apply_threshold(results, data):
apply_threshold = results.get("apply_threshold", False)
if not apply_threshold:
return data
for k in ("threshold_min", "threshold_max"):
if k not in results:
return data
data = data.copy() # do the following in-place changes on a copy
threshold_min = float(results["threshold_min"])
threshold_max = float(results["threshold_max"])
threshold(data, threshold_min, threshold_max, np.nan)
return data

View File

@@ -40,7 +40,7 @@ def calc_roi(results, data):
roi_sum_norm = np.nanmean(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized
roi_indices_x = [ix1, ix2]
roi_proj_x = np.nansum(data_roi, axis=0).tolist()
roi_proj_x = np.nansum(data_roi, axis=0)
roi_intensities.append(roi_sum)
roi_intensities_normalised.append(roi_sum_norm)

View File

@@ -36,7 +36,7 @@ def calc_spi_analysis(results, data):
hit = (photon_percentage > spi_threshold_hit_percentage)
results["number_of_spots"] = photon_percentage
results["is_hit_frame"] = bool(hit) # json does not like numpy bool_ scalars
results["is_hit_frame"] = hit

View File

@@ -205,17 +205,16 @@ def _calc_streakfinder_analysis(results, snr, mask):
streak_lengths = np.sqrt(
np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2) +
np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2)
).tolist()
)
streak_lines = streak_lines.T
_, number_of_streaks = streak_lines.shape
list_result = streak_lines.tolist() # arr(4, n_lines); coord x0, y0, x1, y1
bragg_counts = [streak.total_mass() for streak in detected_streaks]
results["number_of_streaks"] = number_of_streaks
results["is_hit_frame"] = (number_of_streaks > min_hit_streaks)
results["streaks"] = list_result
results["streaks"] = streak_lines # arr(4, n_lines); coord x0, y0, x1, y1
results["streak_lengths"] = streak_lengths
results["bragg_counts"] = bragg_counts

View File

@@ -1,34 +1,41 @@
import numpy as np
#TODO: this is duplicated in calc_radial_integration and calc_apply_aggregation
def calc_apply_threshold(results, data):
def calc_apply_threshold(results, data, value=None, copy=False):
apply_threshold = results.get("apply_threshold", False)
if not apply_threshold:
return
return data
for k in ("threshold_min", "threshold_max"):
if k not in results:
return
threshold_min = results.get("threshold_min")
threshold_max = results.get("threshold_max")
threshold_value_choice = results.get("threshold_value", "NaN")
threshold_value = 0 if threshold_value_choice == "0" else np.nan #TODO
if threshold_min is None and threshold_max is None:
return data
threshold_min = float(results["threshold_min"])
threshold_max = float(results["threshold_max"])
if value is None:
threshold_value = results.get("threshold_value", "NaN")
value = 0 if threshold_value == "0" else np.nan #TODO
threshold(data, threshold_min, threshold_max, threshold_value)
if copy:
data = data.copy() # do the following in-place changes on a copy
threshold(data, threshold_min, threshold_max, value)
return data
def threshold(data, vmin, vmax, replacement):
"""
threshold data in place by replacing values < vmin and values > vmax with replacement
threshold data in place by replacing values < vmin and values >= vmax with replacement
"""
# if vmin > vmax, data will be overwritten entirely -- better to ensure vmin < vmax by switching them if needed
vmin, vmax = sorted((vmin, vmax))
data[data < vmin] = replacement
data[data > vmax] = replacement
if vmin is not None and vmax is not None:
# if vmin > vmax, data will be overwritten entirely -- better to ensure vmin < vmax by switching them if needed
vmin, vmax = sorted((vmin, vmax))
if vmin is not None:
data[data < vmin] = replacement
if vmax is not None:
data[data >= vmax] = replacement

View File

@@ -1,4 +1,5 @@
from .cooldown import cooldown
from .npmemo import npmemo

View File

@@ -0,0 +1,84 @@
import functools
from time import time
class cooldown:
def __init__(self, seconds):
self.seconds = seconds
self.last_fail_time = 0
self.last_fail_exc = None
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
remaining = self.get_remaining()
if remaining > 0:
raise CooldownActive(func, args, kwargs, remaining) from self.last_fail_exc
try:
return func(*args, **kwargs)
except Exception as exc:
self.last_fail_time = time()
self.last_fail_exc = exc
raise exc
return wrapper
def get_remaining(self):
now = time()
elapsed = now - self.last_fail_time
return self.seconds - elapsed
class CooldownActive(Exception):
def __init__(self, func, args, kwargs, remaining):
call = format_call(func, args, kwargs)
msg = f"function call is on cooldown ({remaining:.3g}s remaining): {call}"
super().__init__(msg)
def format_call(func, args, kwargs):
fargs = [truncated_repr(i) for i in args]
fargs += [f"{k}={truncated_repr(v)}" for k, v in kwargs.items()]
fargs = ", ".join(fargs)
return f"{func.__name__}({fargs})"
def truncated_repr(x, length=10, ellipsis="..."):
thresh = 2 * length + len(ellipsis)
x = repr(x)
if len(x) <= thresh:
return x
return x[:length] + ellipsis + x[-length:]
if __name__ == "__main__":
from time import sleep
@cooldown(seconds=5)
def load(fn):
return open(fn).read()
print(load(__file__))
while True:
try:
load("does_not_exist.txt")
except Exception as e:
en = type(e).__name__
print(f"{en}: {e}")
sleep(0.1)

View File

@@ -4,6 +4,7 @@ from .bsreadext import make_bsread_sender, pack_bsread_data
from .bits import read_bit
from .bufjson import BufferedJSON
from .filehandler import FileHandler
from .jsonext import ExtendedJSONEncoder
from .randskip import randskip
from .sorter import Sorter
from .timestamp import make_bsread_timestamp

View File

@@ -1,4 +1,5 @@
from bsread.sender import Sender, PUB
import numpy as np
from bsread import Sender, PUB
def make_bsread_sender(host="*", port=None):
@@ -17,11 +18,22 @@ def pack_bsread_data(orig, prefix, skip=None):
if k in skip:
continue
if isinstance(v, bool):
# bsread expects bools as ints
v = int(v)
elif isinstance(v, list) and not v:
v = None
elif isinstance(v, list):
# bsread fails for empty lists and non-1D lists
v = list_to_array(v)
data[f"{prefix}:{k}"] = v
return data
def list_to_array(x):
try:
# let numpy figure out the dtype
return np.array(x)
except ValueError:
# the above fails for ragged lists but bsread also cannot handle object arrays
return None

22
dap/utils/jsonext.py Normal file
View File

@@ -0,0 +1,22 @@
import json
import numpy as np
from pathlib import Path
class ExtendedJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.generic): # covers all numpy scalars
return obj.item()
elif isinstance(obj, complex): # covers builtin complex
return {"real": obj.real, "imag": obj.imag}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, set):
return sorted(obj)
return super().default(obj)

View File

@@ -5,7 +5,7 @@ import numpy as np
from logzero import logger as log
from algos import (
calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis,
calc_apply_aggregation, calc_apply_threshold, calc_custom, calc_mask_pixels, calc_peakfinder_analysis,
calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData
)
from utils import Aggregator, BufferedJSON, make_bsread_timestamp, randskip, read_bit
@@ -110,8 +110,8 @@ def work(backend_addr, accumulator_addr, visualisation_addr, fn_config, skip_fra
if pixel_mask is not None:
saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels)
results["saturated_pixels"] = len(saturated_pixels_x)
results["saturated_pixels_x"] = saturated_pixels_x.tolist()
results["saturated_pixels_y"] = saturated_pixels_y.tolist()
results["saturated_pixels_x"] = saturated_pixels_x
results["saturated_pixels_y"] = saturated_pixels_y
calc_radial_integration(results, image, pixel_mask)
@@ -123,6 +123,7 @@ def work(backend_addr, accumulator_addr, visualisation_addr, fn_config, skip_fra
calc_roi(results, pfimage)
calc_spi_analysis(results, pfimage)
calc_peakfinder_analysis(results, pfimage, pixel_mask)
calc_custom(results, pfimage, pixel_mask)
# streak finder for convergent-beam diffraction experiments
# changes image and pixel_mask in place if do_snr=True in parameters file

View File

@@ -1,6 +1,8 @@
import numpy as np
import zmq
from utils import ExtendedJSONEncoder
FLAGS = 0
@@ -61,10 +63,10 @@ class ZMQSocketsWorker:
def send_accumulator(self, results):
self.accumulator_socket.send_json(results, FLAGS)
self.accumulator_socket.send_json(results, FLAGS, cls=ExtendedJSONEncoder)
def send_visualisation(self, results, data):
self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE)
self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE, cls=ExtendedJSONEncoder)
self.visualisation_socket.send(data, FLAGS, copy=True, track=True)