Compare commits

...

23 Commits

Author SHA1 Message Date
8f23cc8110 added switch to enable/disable bsread sender 2026-02-06 12:10:59 +01:00
a58015b278 peak_list_* is not a numpy array 2026-02-05 18:36:36 +01:00
aef6933fab adjust to new import logic 2025-11-24 19:19:08 +01:00
a89a6aded6 handle equals case correctly (i.e., like indices) 2025-11-19 18:53:40 +01:00
4de0c8b7d7 allow vmin/vmax = None for "no limit" 2025-11-19 18:53:33 +01:00
ee79ee98a0 actually do send Nones 2025-11-04 11:32:15 +01:00
c58fbbb8fa added another comment 2025-11-04 11:01:22 +01:00
5273e0cf46 do not send Nones 2025-11-04 10:37:12 +01:00
e514061173 added type 2025-11-04 10:34:17 +01:00
8e1073cfd1 removed skipping erroring cases; added skipping constant entries 2025-11-04 10:24:49 +01:00
17a739263d bsread cannot handle object arrays 2025-11-03 22:10:45 +01:00
a69b795450 convert lists to numpy arrays (in order to handle non-1D lists) 2025-11-03 21:31:09 +01:00
10701994ec removed all manual array.tolist() conversions (and similar) that were needed for json serializability 2025-11-03 19:45:37 +01:00
deb9253acf this is not a package 2025-11-03 16:57:09 +01:00
cce141423d use ExtendedJSONEncoder 2025-11-03 16:39:47 +01:00
941ab51856 added ExtendedJSONEncoder 2025-11-03 16:39:40 +01:00
aac524dd36 cooldown of a minute is enough; cache outside cooldown (once cached, no cooldown check needed anymore) 2025-11-03 09:41:13 +01:00
6a441e8c63 added cooldown to avoid checking for a non-existent file at 100 Hz 2025-11-02 16:50:16 +01:00
46fca4a6b5 added custom script 2025-11-01 20:10:23 +01:00
28f7248500 de-duplicated calc_apply_threshold 2025-10-31 18:38:58 +01:00
603f9215e8 added replacement value as argument 2025-10-31 18:35:11 +01:00
36c61c7b8e added boolean copy switch 2025-10-31 18:32:07 +01:00
32adb370cd order 2025-10-31 18:20:51 +01:00
16 changed files with 243 additions and 76 deletions

View File

@@ -9,10 +9,14 @@ from zmqsocks import ZMQSocketsAccumulator, make_address
OUTPUT_DIR = "/gpfs/photonics/swissfel/buffer/dap/data" OUTPUT_DIR = "/gpfs/photonics/swissfel/buffer/dap/data"
ENTRIES_TO_SKIP = [ ENTRIES_TO_SKIP = [
# send: ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (2,) + inhomogeneous part. "custom_script",
"roi_intensities_proj_x", "detector_name",
# recv: ValueError: cannot reshape array of size 4 into shape (2,) "gain_file",
"roi_intensities_x" "htype",
"pedestal_file",
"pulse_id",
"timestamp",
"type"
] ]
@@ -67,6 +71,10 @@ def accumulate(accumulator_addr, bsread_host, bsread_port, bsread_window):
if not sender: if not sender:
continue continue
enable_bsread = results.get("enable_bsread", False)
if not enable_bsread:
continue
timestamp = tuple(results["timestamp"]) timestamp = tuple(results["timestamp"])
data = pack_bsread_data(results, detector, skip=ENTRIES_TO_SKIP) data = pack_bsread_data(results, detector, skip=ENTRIES_TO_SKIP)
sorter.add(pulse_id, (timestamp, data)) sorter.add(pulse_id, (timestamp, data))

View File

@@ -2,6 +2,7 @@
from .addmask import calc_apply_additional_mask from .addmask import calc_apply_additional_mask
from .addmaskfile import calc_apply_additional_mask_from_file from .addmaskfile import calc_apply_additional_mask_from_file
from .aggregation import calc_apply_aggregation from .aggregation import calc_apply_aggregation
from .custom import calc_custom
from .jfdata import JFData from .jfdata import JFData
from .mask import calc_mask_pixels from .mask import calc_mask_pixels
from .peakfind import calc_peakfinder_analysis from .peakfind import calc_peakfinder_analysis

View File

@@ -1,5 +1,5 @@
from .mask import calc_mask_pixels from .mask import calc_mask_pixels
from .thresh import threshold from .thresh import calc_apply_threshold
def calc_apply_aggregation(results, data, pixel_mask, aggregator): def calc_apply_aggregation(results, data, pixel_mask, aggregator):
@@ -7,7 +7,7 @@ def calc_apply_aggregation(results, data, pixel_mask, aggregator):
if aggregator.is_ready(): if aggregator.is_ready():
aggregator.reset() aggregator.reset()
calc_apply_threshold(results, data) # changes data in place calc_apply_threshold(results, data, value=0) # changes data in place
data = calc_aggregate(results, data, aggregator) data = calc_aggregate(results, data, aggregator)
calc_mask_pixels(data, pixel_mask) # changes data in place calc_mask_pixels(data, pixel_mask) # changes data in place
@@ -18,23 +18,6 @@ def calc_apply_aggregation(results, data, pixel_mask, aggregator):
#TODO: this is duplicated in calc_apply_threshold and calc_radial_integration
def calc_apply_threshold(results, data):
apply_threshold = results.get("apply_threshold", False)
if not apply_threshold:
return
for k in ("threshold_min", "threshold_max"):
if k not in results:
return
threshold_min = float(results["threshold_min"])
threshold_max = float(results["threshold_max"])
threshold(data, threshold_min, threshold_max, 0)
def calc_aggregate(results, data, aggregator): def calc_aggregate(results, data, aggregator):
apply_aggregation = results.get("apply_aggregation", False) apply_aggregation = results.get("apply_aggregation", False)
if not apply_aggregation: if not apply_aggregation:

67
dap/algos/custom.py Normal file
View File

@@ -0,0 +1,67 @@
import functools
import importlib.util as ilu
from pathlib import Path
from .utils import cooldown
BASE = "/gpfs/photonics/swissfel/buffer/dap/custom_dap_scripts"
BASE = Path(BASE)
def calc_custom(results, image, pixel_mask):
script = results.get("custom_script")
if not script:
return
try:
func = load_custom(script)
res = func(results, image, pixel_mask)
except Exception as e:
en = type(e).__name__
error = f"{en}: {e}"
return {"custom_script_error": error}
if isinstance(res, dict):
res = {f"{script}:{k}": v for k, v in res.items()}
else:
res = {script: res}
results.update(res)
@functools.cache
@cooldown(60)
def load_custom(script):
beamline, name = script.split(":")
ext = ".py"
if not name.endswith(ext):
name += ext
fn = BASE / beamline / name
return load_proc(fn)
def load_proc(fn):
mod = load_module(fn)
proc_func_name = "proc"
mod_name = mod.__name__
func = getattr(mod, proc_func_name, None) or getattr(mod, mod_name, None)
if func is None:
raise AttributeError(f'module "{mod_name}" contains neither "{proc_func_name}" nor "{mod_name}" function')
return func
def load_module(file_path, module_name=None):
module_name = module_name or Path(file_path).stem
spec = ilu.spec_from_file_location(module_name, file_path)
module = ilu.module_from_spec(spec)
spec.loader.exec_module(module)
return module

View File

@@ -1,6 +1,6 @@
import numpy as np import numpy as np
from .thresh import threshold from .thresh import calc_apply_threshold
from .utils import npmemo from .utils import npmemo
@@ -17,7 +17,7 @@ def calc_radial_integration(results, data, pixel_mask):
r_min = min(rad) r_min = min(rad)
r_max = max(rad) + 1 r_max = max(rad) + 1
data = calc_apply_threshold(results, data) data = calc_apply_threshold(results, data, value=np.nan, copy=True)
rp = radial_profile(data, rad, norm, pixel_mask) rp = radial_profile(data, rad, norm, pixel_mask)
@@ -33,7 +33,7 @@ def calc_radial_integration(results, data, pixel_mask):
rp = rp / integral_silent_region rp = rp / integral_silent_region
results["radint_normalised"] = [silent_min, silent_max] results["radint_normalised"] = [silent_min, silent_max]
results["radint_I"] = rp[r_min:].tolist() #TODO: why not stop at r_max? results["radint_I"] = rp[r_min:] #TODO: why not stop at r_max?
results["radint_q"] = [r_min, r_max] results["radint_q"] = [r_min, r_max]
@@ -59,24 +59,3 @@ def radial_profile(data, rad, norm, keep_pixels):
#TODO: this is duplicated in calc_apply_threshold and calc_apply_aggregation
def calc_apply_threshold(results, data):
apply_threshold = results.get("apply_threshold", False)
if not apply_threshold:
return data
for k in ("threshold_min", "threshold_max"):
if k not in results:
return data
data = data.copy() # do the following in-place changes on a copy
threshold_min = float(results["threshold_min"])
threshold_max = float(results["threshold_max"])
threshold(data, threshold_min, threshold_max, np.nan)
return data

View File

@@ -40,7 +40,7 @@ def calc_roi(results, data):
roi_sum_norm = np.nanmean(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized roi_sum_norm = np.nanmean(data_roi, dtype=float) # data_roi is np.float32, which cannot be json serialized
roi_indices_x = [ix1, ix2] roi_indices_x = [ix1, ix2]
roi_proj_x = np.nansum(data_roi, axis=0).tolist() roi_proj_x = np.nansum(data_roi, axis=0)
roi_intensities.append(roi_sum) roi_intensities.append(roi_sum)
roi_intensities_normalised.append(roi_sum_norm) roi_intensities_normalised.append(roi_sum_norm)

View File

@@ -36,7 +36,7 @@ def calc_spi_analysis(results, data):
hit = (photon_percentage > spi_threshold_hit_percentage) hit = (photon_percentage > spi_threshold_hit_percentage)
results["number_of_spots"] = photon_percentage results["number_of_spots"] = photon_percentage
results["is_hit_frame"] = bool(hit) # json does not like numpy bool_ scalars results["is_hit_frame"] = hit

View File

@@ -205,17 +205,16 @@ def _calc_streakfinder_analysis(results, snr, mask):
streak_lengths = np.sqrt( streak_lengths = np.sqrt(
np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2) + np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2) +
np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2) np.power((streak_lines[..., 2] - streak_lines[..., 0]), 2)
).tolist() )
streak_lines = streak_lines.T streak_lines = streak_lines.T
_, number_of_streaks = streak_lines.shape _, number_of_streaks = streak_lines.shape
list_result = streak_lines.tolist() # arr(4, n_lines); coord x0, y0, x1, y1
bragg_counts = [streak.total_mass() for streak in detected_streaks] bragg_counts = [streak.total_mass() for streak in detected_streaks]
results["number_of_streaks"] = number_of_streaks results["number_of_streaks"] = number_of_streaks
results["is_hit_frame"] = (number_of_streaks > min_hit_streaks) results["is_hit_frame"] = (number_of_streaks > min_hit_streaks)
results["streaks"] = list_result results["streaks"] = streak_lines # arr(4, n_lines); coord x0, y0, x1, y1
results["streak_lengths"] = streak_lengths results["streak_lengths"] = streak_lengths
results["bragg_counts"] = bragg_counts results["bragg_counts"] = bragg_counts

View File

@@ -1,34 +1,41 @@
import numpy as np import numpy as np
#TODO: this is duplicated in calc_radial_integration and calc_apply_aggregation def calc_apply_threshold(results, data, value=None, copy=False):
def calc_apply_threshold(results, data):
apply_threshold = results.get("apply_threshold", False) apply_threshold = results.get("apply_threshold", False)
if not apply_threshold: if not apply_threshold:
return return data
for k in ("threshold_min", "threshold_max"): threshold_min = results.get("threshold_min")
if k not in results: threshold_max = results.get("threshold_max")
return
threshold_value_choice = results.get("threshold_value", "NaN") if threshold_min is None and threshold_max is None:
threshold_value = 0 if threshold_value_choice == "0" else np.nan #TODO return data
threshold_min = float(results["threshold_min"]) if value is None:
threshold_max = float(results["threshold_max"]) threshold_value = results.get("threshold_value", "NaN")
value = 0 if threshold_value == "0" else np.nan #TODO
threshold(data, threshold_min, threshold_max, threshold_value) if copy:
data = data.copy() # do the following in-place changes on a copy
threshold(data, threshold_min, threshold_max, value)
return data
def threshold(data, vmin, vmax, replacement): def threshold(data, vmin, vmax, replacement):
""" """
threshold data in place by replacing values < vmin and values > vmax with replacement threshold data in place by replacing values < vmin and values >= vmax with replacement
""" """
# if vmin > vmax, data will be overwritten entirely -- better to ensure vmin < vmax by switching them if needed if vmin is not None and vmax is not None:
vmin, vmax = sorted((vmin, vmax)) # if vmin > vmax, data will be overwritten entirely -- better to ensure vmin < vmax by switching them if needed
data[data < vmin] = replacement vmin, vmax = sorted((vmin, vmax))
data[data > vmax] = replacement if vmin is not None:
data[data < vmin] = replacement
if vmax is not None:
data[data >= vmax] = replacement

View File

@@ -1,4 +1,5 @@
from .cooldown import cooldown
from .npmemo import npmemo from .npmemo import npmemo

View File

@@ -0,0 +1,84 @@
import functools
from time import time
class cooldown:
def __init__(self, seconds):
self.seconds = seconds
self.last_fail_time = 0
self.last_fail_exc = None
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
remaining = self.get_remaining()
if remaining > 0:
raise CooldownActive(func, args, kwargs, remaining) from self.last_fail_exc
try:
return func(*args, **kwargs)
except Exception as exc:
self.last_fail_time = time()
self.last_fail_exc = exc
raise exc
return wrapper
def get_remaining(self):
now = time()
elapsed = now - self.last_fail_time
return self.seconds - elapsed
class CooldownActive(Exception):
def __init__(self, func, args, kwargs, remaining):
call = format_call(func, args, kwargs)
msg = f"function call is on cooldown ({remaining:.3g}s remaining): {call}"
super().__init__(msg)
def format_call(func, args, kwargs):
fargs = [truncated_repr(i) for i in args]
fargs += [f"{k}={truncated_repr(v)}" for k, v in kwargs.items()]
fargs = ", ".join(fargs)
return f"{func.__name__}({fargs})"
def truncated_repr(x, length=10, ellipsis="..."):
thresh = 2 * length + len(ellipsis)
x = repr(x)
if len(x) <= thresh:
return x
return x[:length] + ellipsis + x[-length:]
if __name__ == "__main__":
from time import sleep
@cooldown(seconds=5)
def load(fn):
return open(fn).read()
print(load(__file__))
while True:
try:
load("does_not_exist.txt")
except Exception as e:
en = type(e).__name__
print(f"{en}: {e}")
sleep(0.1)

View File

@@ -4,6 +4,7 @@ from .bsreadext import make_bsread_sender, pack_bsread_data
from .bits import read_bit from .bits import read_bit
from .bufjson import BufferedJSON from .bufjson import BufferedJSON
from .filehandler import FileHandler from .filehandler import FileHandler
from .jsonext import ExtendedJSONEncoder
from .randskip import randskip from .randskip import randskip
from .sorter import Sorter from .sorter import Sorter
from .timestamp import make_bsread_timestamp from .timestamp import make_bsread_timestamp

View File

@@ -1,4 +1,5 @@
from bsread.sender import Sender, PUB import numpy as np
from bsread import Sender, PUB
def make_bsread_sender(host="*", port=None): def make_bsread_sender(host="*", port=None):
@@ -17,11 +18,22 @@ def pack_bsread_data(orig, prefix, skip=None):
if k in skip: if k in skip:
continue continue
if isinstance(v, bool): if isinstance(v, bool):
# bsread expects bools as ints
v = int(v) v = int(v)
elif isinstance(v, list) and not v: elif isinstance(v, list):
v = None # bsread fails for empty lists and non-1D lists
v = list_to_array(v)
data[f"{prefix}:{k}"] = v data[f"{prefix}:{k}"] = v
return data return data
def list_to_array(x):
try:
# let numpy figure out the dtype
return np.array(x)
except ValueError:
# the above fails for ragged lists but bsread also cannot handle object arrays
return None

22
dap/utils/jsonext.py Normal file
View File

@@ -0,0 +1,22 @@
import json
import numpy as np
from pathlib import Path
class ExtendedJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, np.generic): # covers all numpy scalars
return obj.item()
elif isinstance(obj, complex): # covers builtin complex
return {"real": obj.real, "imag": obj.imag}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, set):
return sorted(obj)
return super().default(obj)

View File

@@ -5,7 +5,7 @@ import numpy as np
from logzero import logger as log from logzero import logger as log
from algos import ( from algos import (
calc_apply_aggregation, calc_apply_threshold, calc_mask_pixels, calc_peakfinder_analysis, calc_apply_aggregation, calc_apply_threshold, calc_custom, calc_mask_pixels, calc_peakfinder_analysis,
calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData calc_radial_integration, calc_roi, calc_spi_analysis, calc_streakfinder_analysis, JFData
) )
from utils import Aggregator, BufferedJSON, make_bsread_timestamp, randskip, read_bit from utils import Aggregator, BufferedJSON, make_bsread_timestamp, randskip, read_bit
@@ -110,8 +110,8 @@ def work(backend_addr, accumulator_addr, visualisation_addr, fn_config, skip_fra
if pixel_mask is not None: if pixel_mask is not None:
saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels) saturated_pixels_y, saturated_pixels_x = jfdata.get_saturated_pixels(raw_image, double_pixels)
results["saturated_pixels"] = len(saturated_pixels_x) results["saturated_pixels"] = len(saturated_pixels_x)
results["saturated_pixels_x"] = saturated_pixels_x.tolist() results["saturated_pixels_x"] = saturated_pixels_x
results["saturated_pixels_y"] = saturated_pixels_y.tolist() results["saturated_pixels_y"] = saturated_pixels_y
calc_radial_integration(results, image, pixel_mask) calc_radial_integration(results, image, pixel_mask)
@@ -123,6 +123,7 @@ def work(backend_addr, accumulator_addr, visualisation_addr, fn_config, skip_fra
calc_roi(results, pfimage) calc_roi(results, pfimage)
calc_spi_analysis(results, pfimage) calc_spi_analysis(results, pfimage)
calc_peakfinder_analysis(results, pfimage, pixel_mask) calc_peakfinder_analysis(results, pfimage, pixel_mask)
calc_custom(results, pfimage, pixel_mask)
# streak finder for convergent-beam diffraction experiments # streak finder for convergent-beam diffraction experiments
# changes image and pixel_mask in place if do_snr=True in parameters file # changes image and pixel_mask in place if do_snr=True in parameters file

View File

@@ -1,6 +1,8 @@
import numpy as np import numpy as np
import zmq import zmq
from utils import ExtendedJSONEncoder
FLAGS = 0 FLAGS = 0
@@ -61,10 +63,10 @@ class ZMQSocketsWorker:
def send_accumulator(self, results): def send_accumulator(self, results):
self.accumulator_socket.send_json(results, FLAGS) self.accumulator_socket.send_json(results, FLAGS, cls=ExtendedJSONEncoder)
def send_visualisation(self, results, data): def send_visualisation(self, results, data):
self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE) self.visualisation_socket.send_json(results, FLAGS | zmq.SNDMORE, cls=ExtendedJSONEncoder)
self.visualisation_socket.send(data, FLAGS, copy=True, track=True) self.visualisation_socket.send(data, FLAGS, copy=True, track=True)