Compare commits

...

5 Commits

Author SHA1 Message Date
leonarski_f cd5d97aa55 FileWriter: Fix
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 10m22s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 12m20s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 11m31s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 14m36s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 14m37s
Build Packages / build:rpm (rocky8) (push) Successful in 12m57s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 14m39s
Build Packages / build:rpm (rocky9) (push) Successful in 11m13s
Build Packages / Generate python client (push) Successful in 56s
Build Packages / Build documentation (push) Successful in 54s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (ubuntu2404) (push) Successful in 8m39s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 9m57s
Build Packages / XDS test (neggia plugin) (push) Successful in 7m28s
Build Packages / XDS test (durin plugin) (push) Successful in 9m6s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m2s
Build Packages / DIALS test (push) Successful in 12m32s
Build Packages / Unit tests (push) Successful in 1h0m13s
2026-05-08 13:34:12 +02:00
leonarski_f 6c8c953c92 jfjoch_process: Cleanup help + by default save HDF5, but without analysis results
Build Packages / Unit tests (push) Failing after 7m55s
Build Packages / build:rpm (rocky8_nocuda) (push) Failing after 8m59s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Failing after 9m12s
Build Packages / build:rpm (rocky9_nocuda) (push) Failing after 10m20s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Failing after 10m21s
Build Packages / build:rpm (rocky9_sls9) (push) Failing after 11m38s
Build Packages / build:rpm (rocky8_sls9) (push) Failing after 11m40s
Build Packages / build:rpm (rocky8) (push) Failing after 11m47s
Build Packages / Generate python client (push) Successful in 1m35s
Build Packages / Build documentation (push) Successful in 2m4s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9) (push) Failing after 7m51s
Build Packages / DIALS test (push) Failing after 6m32s
Build Packages / build:rpm (ubuntu2204) (push) Failing after 8m14s
Build Packages / build:rpm (ubuntu2404) (push) Failing after 9m55s
Build Packages / XDS test (durin plugin) (push) Failing after 8m44s
Build Packages / XDS test (neggia plugin) (push) Failing after 8m53s
Build Packages / XDS test (JFJoch plugin) (push) Failing after 9m18s
2026-05-08 13:32:09 +02:00
leonarski_f 173198be40 HDF5DataFile: Include File name explicitly + FileWriter: Handle NXmxIntegrated in a smarter way
Build Packages / Unit tests (push) Failing after 8m37s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Failing after 9m50s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Failing after 9m49s
Build Packages / build:rpm (rocky8_nocuda) (push) Failing after 9m55s
Build Packages / build:rpm (rocky9_nocuda) (push) Failing after 10m20s
Build Packages / build:rpm (rocky8) (push) Failing after 11m27s
Build Packages / build:rpm (rocky9_sls9) (push) Failing after 11m37s
Build Packages / build:rpm (rocky8_sls9) (push) Failing after 11m40s
Build Packages / Generate python client (push) Successful in 1m27s
Build Packages / Build documentation (push) Successful in 2m17s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9) (push) Failing after 7m46s
Build Packages / build:rpm (ubuntu2204) (push) Failing after 7m46s
Build Packages / DIALS test (push) Failing after 7m53s
Build Packages / XDS test (durin plugin) (push) Failing after 8m6s
Build Packages / XDS test (JFJoch plugin) (push) Failing after 6m51s
Build Packages / XDS test (neggia plugin) (push) Failing after 6m47s
Build Packages / build:rpm (ubuntu2404) (push) Failing after 9m21s
2026-05-08 13:06:44 +02:00
leonarski_f 930cfb0b35 HDF5DataFile: Saving images is a configurable option 2026-05-08 12:31:56 +02:00
leonarski_f 75f1c5f954 SHIM library improvements from the HDF Group
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 13m40s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 15m26s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 17m15s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 17m22s
Build Packages / build:rpm (rocky8) (push) Successful in 17m28s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 17m42s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 18m32s
Build Packages / build:rpm (rocky9) (push) Successful in 10m0s
Build Packages / Generate python client (push) Successful in 43s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 9m31s
Build Packages / Create release (push) Has been skipped
Build Packages / Build documentation (push) Successful in 57s
Build Packages / XDS test (neggia plugin) (push) Successful in 9m46s
Build Packages / XDS test (durin plugin) (push) Successful in 11m1s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 10m54s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 11m58s
Build Packages / DIALS test (push) Successful in 13m41s
Build Packages / Unit tests (push) Successful in 1h1m14s
2026-05-08 11:39:51 +02:00
12 changed files with 805 additions and 184 deletions
+1
View File
@@ -251,6 +251,7 @@ struct StartMessage {
std::optional<float> attenuator_transmission;
std::optional<bool> write_master_file;
std::optional<bool> write_images;
nlohmann::json user_data;
+1
View File
@@ -96,6 +96,7 @@ There are minor differences at the moment:
| | | type "azim": qmin, qmax (numbers) | |
| - gain_file_names | Array(string) | Names of JUNGFRAU gain files used for the current detector | |
| - write_master_file | bool | With multiple sockets, it selects which socket will provide master file | |
| - write_images | bool | Write images in the HDF5 file (if false, will only write metadata) | |
| - data_reduction_factor_serialmx | uint64 | Data reduction factor for serial MX | |
| - experiment_group | string | ID of instrument user, e.g., p-group (SLS/SwissFEL) or proposal number | |
| - jfjoch_release | string | Jungfraujoch release number | |
@@ -1012,6 +1012,8 @@ namespace {
ProcessROIConfig(message, j["roi"]);
if (j.contains("gain_file_names"))
message.gain_file_names = j["gain_file_names"];
if (j.contains("write_images"))
message.write_images = j["write_images"];
if (j.contains("write_master_file"))
message.write_master_file = j["write_master_file"];
if (j.contains("data_reduction_factor_serialmx"))
@@ -493,6 +493,8 @@ inline void CBOR_ENC_START_USER_DATA(CborEncoder& encoder, const char* key,
j["gain_file_names"] = message.gain_file_names;
if (message.write_master_file)
j["write_master_file"] = message.write_master_file.value();
if (message.write_images)
j["write_images"] = message.write_images.value();
if (message.data_reduction_factor_serialmx)
j["data_reduction_factor_serialmx"] = message.data_reduction_factor_serialmx.value();
j["experiment_group"] = message.experiment_group;
+682 -102
View File
@@ -4,153 +4,733 @@
// This file may be used, modified, and distributed under either GPL-3.0-only
// or the HDF5 license, at the recipient's option.
// Kindly acknowledge modifications from the HDF5 Group
/*
* enospc_shim — LD_PRELOAD shim that forces ENOSPC after a fixed write
* budget. Pair with the H5FDpoison_sec2 driver to drive HDF5's
* out-of-space recovery path against a real disk.
*
* Configuration:
* ENOSPC_AFTER (env var) — total write bytes allowed before subsequent
* writes/syncs return ENOSPC. Default 10 MiB.
* Read once at constructor time; use the
* runtime API below to change it per test.
*
* Runtime API (callable via dlsym after LD_PRELOAD):
* void enospc_shim_reset(size_t new_fail_after);
* Zero the byte counter. If new_fail_after != 0,
* also replace the cap.
* size_t enospc_shim_get_total(void);
* Bytes charged against the budget so far.
* size_t enospc_shim_get_fail_after(void);
* Current cap.
*
* Intercepts: write, pwrite, pwrite64, writev, pwritev, pwritev64,
* fallocate, fallocate64, posix_fallocate,
* posix_fallocate64, fsync, fdatasync, ftruncate,
* ftruncate64, dprintf, vdprintf, and relevant FORTIFY
* *_chk write/dprintf variants.
*
* Caveats:
* - mmap-backed I/O is not intercepted; do not test memory-mapped VFDs
* with this shim.
* - aio_write, copy_file_range, sendfile, and stdio FILE* output
* are not intercepted.
* - The byte budget is process-global, not per-fd. Mixing real I/O
* (logging, /dev/null, etc.) with the file under test will exhaust
* the budget early.
*/
#define _GNU_SOURCE
/*
* This interposer exports both default and *64 ELF symbols explicitly.
* Avoid header-level redirects that would rename the default definitions
* under -D_FILE_OFFSET_BITS=64 and collide with the explicit *64 wrappers.
*/
#ifdef _FILE_OFFSET_BITS
#undef _FILE_OFFSET_BITS
#endif
#ifdef _TIME_BITS
#undef _TIME_BITS
#endif
#include <dlfcn.h>
#include <errno.h>
#include <unistd.h>
#include <sys/uio.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <linux/falloc.h>
#include <pthread.h>
#include <stdint.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
// Function pointers to real syscalls
static ssize_t (*real_pwrite64) (int, const void *, size_t, __off64_t) = NULL;
static ssize_t (*real_write)(int, const void *, size_t) = NULL;
static ssize_t (*real_pwrite)(int, const void *, size_t, off_t) = NULL;
static ssize_t (*real_pwrite64)(int, const void *, size_t, off64_t) = NULL;
static ssize_t (*real_writev)(int, const struct iovec *, int) = NULL;
static ssize_t (*real_pwritev)(int, const struct iovec *, int, off_t) = NULL;
static ssize_t (*real_pwritev64)(int, const struct iovec *, int, off64_t) = NULL;
static int (*real_fsync)(int) = NULL;
static int (*real_fdatasync)(int) = NULL;
static int (*real_ftruncate)(int, off_t) = NULL;
static int (*real_ftruncate64)(int, off64_t) = NULL;
static int (*real_fallocate)(int, int, off_t, off_t) = NULL;
static int (*real_fallocate64)(int, int, off64_t, off64_t) = NULL;
static int (*real_posix_fallocate)(int, off_t, off_t) = NULL;
static int (*real_posix_fallocate64)(int, off64_t, off64_t) = NULL;
static int (*real___vasprintf_chk)(char **, int, const char *, va_list) = NULL;
// State
static size_t total_written = 0;
static size_t fail_after = 0;
static size_t fail_after = 10ULL * 1024 * 1024;
// Thread safety (important in real tests)
static pthread_once_t init_once = PTHREAD_ONCE_INIT;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// Initialize real symbols and config
#define LOAD_SYMBOL(function_pointer, symbol_name) \
(*(void **)(&(function_pointer)) = dlsym(RTLD_NEXT, (symbol_name)))
static void init_symbols(void) {
const char *env = getenv("ENOSPC_AFTER");
LOAD_SYMBOL(real_write, "write");
LOAD_SYMBOL(real_pwrite, "pwrite");
LOAD_SYMBOL(real_pwrite64, "pwrite64");
LOAD_SYMBOL(real_writev, "writev");
LOAD_SYMBOL(real_pwritev, "pwritev");
LOAD_SYMBOL(real_pwritev64, "pwritev64");
LOAD_SYMBOL(real_fsync, "fsync");
LOAD_SYMBOL(real_fdatasync, "fdatasync");
LOAD_SYMBOL(real_ftruncate, "ftruncate");
LOAD_SYMBOL(real_ftruncate64, "ftruncate64");
LOAD_SYMBOL(real_fallocate, "fallocate");
LOAD_SYMBOL(real_fallocate64, "fallocate64");
LOAD_SYMBOL(real_posix_fallocate, "posix_fallocate");
LOAD_SYMBOL(real_posix_fallocate64, "posix_fallocate64");
LOAD_SYMBOL(real___vasprintf_chk, "__vasprintf_chk");
if (env != NULL)
fail_after = strtoull(env, NULL, 10);
}
__attribute__((constructor))
static void init(void) {
real_pwrite64 = dlsym(RTLD_NEXT, "pwrite64");
const char* env = getenv("ENOSPC_AFTER");
if (env) {
fail_after = strtoull(env, NULL, 10);
} else {
fail_after = 10ULL * 1024 * 1024; // default: 10 MB
}
printf("ENOSPC shim loaded with limit %lu bytes\n", fail_after);
pthread_once(&init_once, init_symbols);
}
/*
// Common helper
static int should_fail(size_t upcoming) {
if (fail_after == 0) return 0;
* Reserve up to `requested` bytes against the global budget.
*
* - returns -1 with errno=ENOSPC if the budget is already exhausted.
* - on success, *allowed is the number of bytes the caller may now
* pass to the real write syscall. It may be smaller than requested,
* producing a deliberate short write that mirrors how a kernel
* reports a partial fill before the next call hits ENOSPC.
*/
static int reserve_bytes(size_t requested, size_t *allowed) {
pthread_mutex_lock(&lock);
if (total_written >= fail_after) return 1;
if (total_written + upcoming > fail_after) {
// Simulate partial write exhaustion:
// allow some bytes, then next call fails
if (requested == 0) {
*allowed = 0;
pthread_mutex_unlock(&lock);
return 0;
}
if (total_written >= fail_after) {
pthread_mutex_unlock(&lock);
errno = ENOSPC;
return -1;
}
*allowed = requested;
if (requested > fail_after - total_written)
*allowed = fail_after - total_written;
total_written += *allowed;
pthread_mutex_unlock(&lock);
return 0;
}
// ---- write ----
ssize_t write(int fd, const void *buf, size_t count) {
/*
* If the real syscall wrote fewer bytes than reserved (kernel short
* write or outright failure), refund the difference so the next caller
* gets a faithful budget. Without this, a transient short write would
* permanently shrink the effective limit below ENOSPC_AFTER.
*/
static void release_unused_reservation(size_t reserved, ssize_t written) {
size_t actual = written > 0 ? (size_t)written : 0;
size_t unused;
if (actual >= reserved)
return;
unused = reserved - actual;
pthread_mutex_lock(&lock);
printf("write %lu bytes\n", count);
if (total_written >= fail_after) {
pthread_mutex_unlock(&lock);
errno = ENOSPC;
return -1;
}
if (unused > total_written)
total_written = 0;
else
total_written -= unused;
pthread_mutex_unlock(&lock);
ssize_t ret = real_write(fd, buf, count);
if (ret > 0) {
pthread_mutex_lock(&lock);
total_written += ret;
pthread_mutex_unlock(&lock);
}
return ret;
}
// ---- pwrite ----
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) {
pthread_mutex_lock(&lock);
printf("pwrite %lu bytes\n", count);
if (total_written >= fail_after) {
pthread_mutex_unlock(&lock);
errno = ENOSPC;
return -1;
}
pthread_mutex_unlock(&lock);
ssize_t ret = real_pwrite(fd, buf, count, offset);
if (ret > 0) {
pthread_mutex_lock(&lock);
total_written += ret;
pthread_mutex_unlock(&lock);
}
return ret;
}
// ---- writev ----
ssize_t writev(int fd, const struct iovec *iov, int iovcnt) {
static size_t iov_total(const struct iovec *iov, int iovcnt) {
size_t total = 0;
for (int i = 0; i < iovcnt; i++) {
if (SIZE_MAX - total < iov[i].iov_len)
return SIZE_MAX;
total += iov[i].iov_len;
}
printf("writev %lu bytes\n", total);
return total;
}
/*
* Build a copy of an iovec truncated to at most `allowed` bytes total.
* Used by writev/pwritev/pwritev64 to deliver a controlled short write
* that consumes exactly the remaining budget. Caller frees the copy.
*/
static int trim_iov(
const struct iovec *iov,
int iovcnt,
size_t allowed,
struct iovec **trimmed_iov,
int *trimmed_iovcnt
) {
struct iovec *copy;
int used = 0;
if (iovcnt <= 0 || allowed == 0) {
*trimmed_iov = NULL;
*trimmed_iovcnt = 0;
return 0;
}
copy = malloc((size_t)iovcnt * sizeof(*copy));
if (copy == NULL)
return -1;
for (int i = 0; i < iovcnt && allowed > 0; i++) {
size_t len = iov[i].iov_len;
if (len > allowed)
len = allowed;
copy[used].iov_base = iov[i].iov_base;
copy[used].iov_len = len;
used++;
allowed -= len;
}
*trimmed_iov = copy;
*trimmed_iovcnt = used;
return 0;
}
/*
* fsync / fdatasync entry-check: once the byte budget is gone, sync
* fails with ENOSPC. Real filesystems can defer ENOSPC to fsync if
* delayed allocation is in play, so this models that behavior.
*/
static int fail_if_exhausted(void) {
pthread_mutex_lock(&lock);
if (total_written < fail_after) {
pthread_mutex_unlock(&lock);
return 0;
}
pthread_mutex_unlock(&lock);
errno = ENOSPC;
return -1;
}
static size_t current_fail_after(void) {
size_t cap;
pthread_mutex_lock(&lock);
cap = fail_after;
pthread_mutex_unlock(&lock);
return cap;
}
if (total_written >= fail_after) {
pthread_mutex_unlock(&lock);
static int positive_range_exceeds_budget(uintmax_t offset, uintmax_t len) {
uintmax_t end;
if (UINTMAX_MAX - offset < len)
return 1;
end = offset + len;
return end > (uintmax_t)current_fail_after();
}
static int signed_range_exceeds_budget(off64_t offset, off64_t len) {
if (offset < 0 || len < 0)
return 0;
return positive_range_exceeds_budget((uintmax_t)offset, (uintmax_t)len);
}
static int file_size_plus_len_exceeds_budget(int fd, off64_t len) {
struct stat st;
if (len <= 0)
return 0;
if (fstat(fd, &st) < 0)
return 0;
if (st.st_size < 0)
return 0;
return positive_range_exceeds_budget((uintmax_t)st.st_size, (uintmax_t)len);
}
static int fallocate_exceeds_budget(
int fd,
int mode,
off64_t offset,
off64_t len
) {
if (offset < 0 || len <= 0)
return 0;
if ((mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_COLLAPSE_RANGE)) != 0)
return 0;
if ((mode & FALLOC_FL_INSERT_RANGE) != 0)
return file_size_plus_len_exceeds_budget(fd, len);
return signed_range_exceeds_budget(offset, len);
}
static int truncate_length_exceeds_budget(off64_t length) {
if (length < 0)
return 0;
return positive_range_exceeds_budget((uintmax_t)length, 0);
}
/*
* write/pwrite/pwrite64 follow an identical pattern:
* 1. resolve the real symbol on first use,
* 2. reserve at most `count` bytes — return -1/ENOSPC if exhausted,
* 3. issue the real syscall on the trimmed length,
* 4. refund any portion the kernel ultimately did not write.
*
* The writev/pwritev/pwritev64 variants additionally call trim_iov() to
* truncate the iovec array to the reserved length.
*/
ssize_t write(int fd, const void *buf, size_t count) {
size_t allowed = 0;
ssize_t ret;
pthread_once(&init_once, init_symbols);
if (real_write == NULL) {
errno = ENOSYS;
return -1;
}
if (reserve_bytes(count, &allowed) < 0)
return -1;
ret = real_write(fd, buf, allowed);
release_unused_reservation(allowed, ret);
return ret;
}
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) {
size_t allowed = 0;
ssize_t ret;
pthread_once(&init_once, init_symbols);
if (real_pwrite == NULL) {
errno = ENOSYS;
return -1;
}
if (reserve_bytes(count, &allowed) < 0)
return -1;
ret = real_pwrite(fd, buf, allowed, offset);
release_unused_reservation(allowed, ret);
return ret;
}
ssize_t pwrite64(int fd, const void *buf, size_t count, off64_t offset) {
size_t allowed = 0;
ssize_t ret;
pthread_once(&init_once, init_symbols);
if (real_pwrite64 == NULL) {
errno = ENOSYS;
return -1;
}
if (reserve_bytes(count, &allowed) < 0)
return -1;
ret = real_pwrite64(fd, buf, allowed, offset);
release_unused_reservation(allowed, ret);
return ret;
}
ssize_t writev(int fd, const struct iovec *iov, int iovcnt) {
struct iovec *trimmed_iov = NULL;
int trimmed_iovcnt = 0;
size_t allowed = 0;
size_t requested;
ssize_t ret;
pthread_once(&init_once, init_symbols);
if (real_writev == NULL) {
errno = ENOSYS;
return -1;
}
if (iovcnt <= 0 || iov == NULL)
return real_writev(fd, iov, iovcnt);
requested = iov_total(iov, iovcnt);
if (reserve_bytes(requested, &allowed) < 0)
return -1;
if (trim_iov(iov, iovcnt, allowed, &trimmed_iov, &trimmed_iovcnt) < 0) {
release_unused_reservation(allowed, -1);
return -1;
}
ret = real_writev(fd, trimmed_iov, trimmed_iovcnt);
free(trimmed_iov);
release_unused_reservation(allowed, ret);
return ret;
}
ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt, off_t offset) {
struct iovec *trimmed_iov = NULL;
int trimmed_iovcnt = 0;
size_t allowed = 0;
size_t requested;
ssize_t ret;
pthread_once(&init_once, init_symbols);
if (real_pwritev == NULL) {
errno = ENOSYS;
return -1;
}
if (iovcnt <= 0 || iov == NULL)
return real_pwritev(fd, iov, iovcnt, offset);
requested = iov_total(iov, iovcnt);
if (reserve_bytes(requested, &allowed) < 0)
return -1;
if (trim_iov(iov, iovcnt, allowed, &trimmed_iov, &trimmed_iovcnt) < 0) {
release_unused_reservation(allowed, -1);
return -1;
}
ret = real_pwritev(fd, trimmed_iov, trimmed_iovcnt, offset);
free(trimmed_iov);
release_unused_reservation(allowed, ret);
return ret;
}
ssize_t pwritev64(int fd, const struct iovec *iov, int iovcnt, off64_t offset) {
struct iovec *trimmed_iov = NULL;
int trimmed_iovcnt = 0;
size_t allowed = 0;
size_t requested;
ssize_t ret;
pthread_once(&init_once, init_symbols);
if (real_pwritev64 == NULL) {
errno = ENOSYS;
return -1;
}
if (iovcnt <= 0 || iov == NULL)
return real_pwritev64(fd, iov, iovcnt, offset);
requested = iov_total(iov, iovcnt);
if (reserve_bytes(requested, &allowed) < 0)
return -1;
if (trim_iov(iov, iovcnt, allowed, &trimmed_iov, &trimmed_iovcnt) < 0) {
release_unused_reservation(allowed, -1);
return -1;
}
ret = real_pwritev64(fd, trimmed_iov, trimmed_iovcnt, offset);
free(trimmed_iov);
release_unused_reservation(allowed, ret);
return ret;
}
/*
* Defensive FORTIFY entry points. glibc does not currently route write()
* through these symbols, but other libc/toolchain combinations may. Keep
* the fortify size check on the caller's original request before applying
* the shim's usual trimming behavior.
*/
ssize_t __write_chk(int fd, const void *buf, size_t count, size_t buflen) {
if (count > buflen)
abort();
return write(fd, buf, count);
}
ssize_t __pwrite_chk(
int fd,
const void *buf,
size_t count,
off_t offset,
size_t buflen
) {
if (count > buflen)
abort();
return pwrite(fd, buf, count, offset);
}
ssize_t __pwrite64_chk(
int fd,
const void *buf,
size_t count,
off64_t offset,
size_t buflen
) {
if (count > buflen)
abort();
return pwrite64(fd, buf, count, offset);
}
static int write_all_budgeted(int fd, const char *buf, size_t len) {
size_t done = 0;
while (done < len) {
ssize_t ret = write(fd, buf + done, len - done);
if (ret < 0)
return -1;
if (ret == 0) {
errno = EIO;
return -1;
}
done += (size_t)ret;
}
return (int)done;
}
static int budgeted_vdprintf_chk(
int fd,
int flag,
const char *format,
va_list ap
) {
char *formatted = NULL;
va_list ap_copy;
int len;
int ret;
pthread_once(&init_once, init_symbols);
va_copy(ap_copy, ap);
if (real___vasprintf_chk != NULL)
len = real___vasprintf_chk(&formatted, flag, format, ap_copy);
else
len = vasprintf(&formatted, format, ap_copy);
va_end(ap_copy);
if (len < 0)
return -1;
ret = write_all_budgeted(fd, formatted, (size_t)len);
free(formatted);
return ret;
}
int vdprintf(int fd, const char *format, va_list ap) {
return budgeted_vdprintf_chk(fd, 0, format, ap);
}
int dprintf(int fd, const char *format, ...) {
va_list ap;
int ret;
va_start(ap, format);
ret = budgeted_vdprintf_chk(fd, 0, format, ap);
va_end(ap);
return ret;
}
int __vdprintf_chk(int fd, int flag, const char *format, va_list ap) {
return budgeted_vdprintf_chk(fd, flag, format, ap);
}
int __dprintf_chk(int fd, int flag, const char *format, ...) {
va_list ap;
int ret;
va_start(ap, format);
ret = budgeted_vdprintf_chk(fd, flag, format, ap);
va_end(ap);
return ret;
}
/*
* fallocate / posix_fallocate do not have a short-success convention we can
* use. Refuse allocation ranges that would extend past ENOSPC_AFTER and
* otherwise delegate to the real libc entry point.
*/
int fallocate(int fd, int mode, off_t offset, off_t len) {
pthread_once(&init_once, init_symbols);
if (real_fallocate == NULL) {
errno = ENOSYS;
return -1;
}
if (fallocate_exceeds_budget(fd, mode, (off64_t)offset, (off64_t)len)) {
errno = ENOSPC;
return -1;
}
pthread_mutex_unlock(&lock);
return real_fallocate(fd, mode, offset, len);
}
ssize_t ret = real_writev(fd, iov, iovcnt);
if (ret > 0) {
pthread_mutex_lock(&lock);
total_written += ret;
pthread_mutex_unlock(&lock);
int fallocate64(int fd, int mode, off64_t offset, off64_t len) {
pthread_once(&init_once, init_symbols);
if (real_fallocate64 == NULL) {
errno = ENOSYS;
return -1;
}
pwrite64()
return ret;
} */
ssize_t pwrite64(int fd, const void *buf, size_t count, off_t offset) {
pthread_mutex_lock(&lock);
printf("pwrite64 %lu bytes\n", count);
if (total_written >= fail_after) {
pthread_mutex_unlock(&lock);
if (fallocate_exceeds_budget(fd, mode, offset, len)) {
errno = ENOSPC;
return -1;
}
pthread_mutex_unlock(&lock);
return real_fallocate64(fd, mode, offset, len);
}
ssize_t ret = real_pwrite64(fd, buf, count, offset);
int posix_fallocate(int fd, off_t offset, off_t len) {
pthread_once(&init_once, init_symbols);
if (real_posix_fallocate == NULL)
return ENOSYS;
if (ret > 0) {
pthread_mutex_lock(&lock);
total_written += ret;
pthread_mutex_unlock(&lock);
if (fallocate_exceeds_budget(fd, 0, (off64_t)offset, (off64_t)len))
return ENOSPC;
return real_posix_fallocate(fd, offset, len);
}
int posix_fallocate64(int fd, off64_t offset, off64_t len) {
pthread_once(&init_once, init_symbols);
if (real_posix_fallocate64 == NULL)
return ENOSYS;
if (fallocate_exceeds_budget(fd, 0, offset, len))
return ENOSPC;
return real_posix_fallocate64(fd, offset, len);
}
int fsync(int fd) {
pthread_once(&init_once, init_symbols);
if (real_fsync == NULL) {
errno = ENOSYS;
return -1;
}
return ret;
}
if (fail_if_exhausted() < 0)
return -1;
return real_fsync(fd);
}
int fdatasync(int fd) {
pthread_once(&init_once, init_symbols);
if (real_fdatasync == NULL) {
errno = ENOSYS;
return -1;
}
if (fail_if_exhausted() < 0)
return -1;
return real_fdatasync(fd);
}
/*
* ftruncate / ftruncate64: refuse any extension past the byte budget.
* Truncations *down* are still permitted — they don't consume space and
* may be used by HDF5 cleanup paths. The check is on the absolute target
* length, not on the delta, so it works for a freshly-opened fd too.
*/
int ftruncate(int fd, off_t length) {
pthread_once(&init_once, init_symbols);
if (real_ftruncate == NULL) {
errno = ENOSYS;
return -1;
}
if (truncate_length_exceeds_budget((off64_t)length)) {
errno = ENOSPC;
return -1;
}
return real_ftruncate(fd, length);
}
int ftruncate64(int fd, off64_t length) {
pthread_once(&init_once, init_symbols);
if (real_ftruncate64 == NULL) {
errno = ENOSYS;
return -1;
}
if (truncate_length_exceeds_budget(length)) {
errno = ENOSPC;
return -1;
}
return real_ftruncate64(fd, length);
}
/*
* Test/control API. Default ELF visibility makes these symbols dlsym-able
* from a test harness — declare them as extern in the test code.
*/
void enospc_shim_reset(size_t new_fail_after) {
pthread_mutex_lock(&lock);
total_written = 0;
if (new_fail_after != 0)
fail_after = new_fail_after;
pthread_mutex_unlock(&lock);
}
size_t enospc_shim_get_total(void) {
size_t total;
pthread_mutex_lock(&lock);
total = total_written;
pthread_mutex_unlock(&lock);
return total;
}
size_t enospc_shim_get_fail_after(void) {
size_t cap;
pthread_mutex_lock(&lock);
cap = fail_after;
pthread_mutex_unlock(&lock);
return cap;
}
+1 -1
View File
@@ -14,7 +14,7 @@ TEST_CASE("HDF5File_enospc") {
std::vector<uint8_t> small_vector(2056), large_vector(40 * 1024 * 1024);
REQUIRE_NOTHROW(file->SaveVector("/small", small_vector));
REQUIRE_NOTHROW(file->SaveVector("/large1", large_vector));
REQUIRE_THROWS(file->SaveVector("/large1", large_vector));
REQUIRE_THROWS(file->SaveVector("/large2", large_vector));
REQUIRE_THROWS(file->Close());
REQUIRE_NOTHROW(file.reset());
+22 -12
View File
@@ -37,21 +37,30 @@ void print_usage(Logger &logger) {
logger.Info(" -s<num> Start image number (default: 0)");
logger.Info(" -e<num> End image number (default: all)");
logger.Info(" -v Verbose output");
logger.Info(" -R[num] Rotation indexing (optional: min angular range deg)");
logger.Info(" -F Use FFT indexing algorithm (shortcut for -XFFT)");
logger.Info(" -X<txt> Indexing algorithm (FFBIDX|FFT|FFTW|Auto|None)");
logger.Info(" -x No least-square beam center refinement");
logger.Info(" -W Include images in the written HDF5 file (otherwise only analysis results are saved)");
logger.Info(" -U Unmerged intensities are written to a text file");
logger.Info("");
logger.Info(" Spot finding");
logger.Info(" -T<num> Noise sigma level for spot finding (default: 3.0)");
logger.Info(" -t<num> Photon count threshold for spot finding (default: 10)");
logger.Info(" -d<num> High resolution limit for spot finding (default: 1.5)");
logger.Info(" -c<num> Max spot count (default: 250)");
logger.Info("");
logger.Info(" Indexing");
logger.Info(" -R[num] Rotation indexing (optional: min angular range deg)");
logger.Info(" -X<txt> Indexing algorithm (FFBIDX|FFT|FFTW|Auto|None)");
logger.Info(" -F Use FFT indexing algorithm (shortcut for -XFFT)");
logger.Info(" -S<num> Space group number - used for both indexing and scaling");
logger.Info(" -C<cell> Fix reference unit cell: -C\"a,b,c,alpha,beta,gamma\" (comma-separated, no spaces; quotes optional)");
logger.Info(" -x No least-square beam center refinement");
logger.Info("");
logger.Info(" Scaling and merging");
logger.Info(" -D<num> High resolution limit for scaling/merging (default: 0.0; no limit)");
logger.Info(" -S<num> Space group number");
logger.Info(" -M Scale and merge (refine mosaicity) and write scaled.hkl + image.dat");
logger.Info(" -P<txt> Partiality refinement fixed|rot|unity (default: fixed)");
logger.Info(" -A Anomalous mode (don't merge Friedel pairs)");
logger.Info(" -C<cell> Fix reference unit cell: -C\"a,b,c,alpha,beta,gamma\" (comma-separated, no spaces; quotes optional)");
logger.Info(" -c<num> Max spot count (default: 250)");
logger.Info(" -W HDF5 file with analysis results is written");
logger.Info(" -T<num> Noise sigma level for spot finding (default: 3.0)");
logger.Info(" -t<num> Photon count threshold for spot finding (default: 10)");
}
void trim_in_place(std::string& t) {
@@ -364,10 +373,12 @@ int main(int argc, char **argv) {
start_message.pixel_mask["default"] = pixel_mask.GetMask(experiment);
start_message.max_spot_count = experiment.GetMaxSpotCount();
start_message.write_images = write_output;
start_message.file_format = FileWriterFormat::NXmxIntegrated;
std::unique_ptr<FileWriter> writer;
try {
if (write_output)
if (!output_prefix.empty())
writer = std::make_unique<FileWriter>(start_message);
} catch (const std::exception &e) {
logger.Error("Failed to initialize file writer: {}", e.what());
@@ -536,7 +547,6 @@ int main(int argc, char **argv) {
logger.Info("Rotation Indexing found lattice");
}
// --- Optional: run scaling (mosaicity refinement) on accumulated reflections ---
// --- Optional: run scaling (mosaicity refinement) on accumulated reflections ---
if (run_scaling) {
logger.Info("Running scaling (mosaicity refinement) ...");
+36 -19
View File
@@ -71,26 +71,34 @@ void FileWriter::WriteHDF5(const DataMessage& msg) {
if (msg.number < 0)
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds, "No support for negative images");
const uint64_t file_number = (start_message.images_per_file == 0) ? 0 : msg.number / start_message.images_per_file;
const uint64_t image_number = (start_message.images_per_file == 0) ? msg.number : msg.number % start_message.images_per_file;
if (closed_files.contains(file_number))
return;
if (files.size() <= file_number)
files.resize(file_number + 1);
if (!files[file_number]) {
files[file_number] = std::make_unique<HDF5DataFile>(start_message, file_number);
if (format == FileWriterFormat::NXmxIntegrated && master_file)
files[file_number]->CreateFile(msg, master_file->GetFile());
}
files[file_number]->Write(msg, image_number);
if (files[file_number]->GetNumImages() == start_message.images_per_file) {
CloseFile(file_number);
if (format == FileWriterFormat::NXmxIntegrated && master_file) {
if (files.empty() )
files.resize(1);
if (!files[0]) {
files[0] = std::make_unique<HDF5DataFile>(start_message, 0, HDF5Metadata::MasterFileName(start_message));
files[0]->CreateFile(msg, master_file->GetFile());
}
files[0]->Write(msg, msg.number);
} else {
CloseOldFiles(static_cast<uint64_t>(msg.number));
const uint64_t file_number = (start_message.images_per_file == 0) ? 0 : msg.number / start_message.images_per_file;
const uint64_t image_number = (start_message.images_per_file == 0) ? msg.number : msg.number % start_message.images_per_file;
if (closed_files.contains(file_number))
return;
if (files.size() <= file_number)
files.resize(file_number + 1);
if (!files[file_number])
files[file_number] = std::make_unique<HDF5DataFile>(start_message, file_number,
HDF5Metadata::DataFileName(start_message, file_number));
files[file_number]->Write(msg, image_number);
if (files[file_number]->GetNumImages() == start_message.images_per_file) {
CloseFile(file_number);
} else {
CloseOldFiles(static_cast<uint64_t>(msg.number));
}
}
}
@@ -230,6 +238,15 @@ void FileWriter::WriteHDF5(const CompressedImage &msg) {
void FileWriter::WriteHDF5(const EndMessage &msg) {
if (master_file) {
std::lock_guard<std::mutex> lock(hdf5_mutex);
if (format == FileWriterFormat::NXmxIntegrated) {
try {
CloseFile(0);
} catch (...) {
throw;
}
}
master_file->Finalize(msg);
}
}
+48 -40
View File
@@ -20,9 +20,10 @@
#include "HDF5NXmx.h"
#include "../common/time_utc.h"
HDF5DataFile::HDF5DataFile(const StartMessage &msg, uint64_t in_file_number) {
file_number = in_file_number;
HDF5DataFile::HDF5DataFile(const StartMessage &msg, uint64_t file_number, const std::string &filename) :
filename(filename),
file_number(file_number),
write_images(msg.write_images.value_or(true)) {
if (msg.overwrite.has_value())
overwrite = msg.overwrite.value();
@@ -30,9 +31,14 @@ HDF5DataFile::HDF5DataFile(const StartMessage &msg, uint64_t in_file_number) {
ypixel = 0;
max_image_number = 0;
nimages = 0;
filename = HDF5Metadata::DataFileName(msg, file_number);
image_low = file_number * msg.images_per_file;
images_per_file = msg.images_per_file;
if (msg.file_format == FileWriterFormat::NXmxIntegrated) {
image_low = 0;
images_per_file = msg.number_of_images;
} else {
image_low = file_number * msg.images_per_file;
images_per_file = msg.images_per_file;
}
timestamp.reserve(images_per_file);
exptime.reserve(images_per_file);
@@ -110,7 +116,6 @@ HDF5DataFile::~HDF5DataFile() {
if (data_file) {
try {
data_set.reset();
data_set_image_number.reset();
data_file.reset();
if (manage_file) {
std::error_code ec;
@@ -124,42 +129,44 @@ HDF5DataFile::~HDF5DataFile() {
}
}
void HDF5DataFile::CreateFile(const DataMessage& msg, std::shared_ptr<HDF5File> in_data_file, bool integrated) {
HDF5Dcpl dcpl;
HDF5DataType data_type(msg.image.GetMode());
xpixel = msg.image.GetWidth();
ypixel = msg.image.GetHeight();
dcpl.SetCompression(msg.image.GetCompressionAlgorithm(), JFJochBitShuffleCompressor::DefaultBlockSize);
dcpl.SetChunking( {1, ypixel, xpixel});
H5Pset_fill_time(dcpl.GetID(), H5D_FILL_TIME_NEVER);
H5Pset_alloc_time(dcpl.GetID(), H5D_ALLOC_TIME_INCR);
switch (msg.image.GetMode()) {
case CompressedImageMode::Int8:
dcpl.SetFillValue8(INT8_MIN);
break;
case CompressedImageMode::Int16:
dcpl.SetFillValue16(INT16_MIN);
break;
case CompressedImageMode::Int32:
dcpl.SetFillValue32(INT32_MIN);
break;
default:
break;
}
void HDF5DataFile::CreateFile(const DataMessage& msg, std::shared_ptr<HDF5File> in_data_file) {
data_file = in_data_file;
HDF5Group(*data_file, "/entry").NXClass("NXentry");
HDF5Group(*data_file, "/entry/data").NXClass("NXdata");
if (write_images) {
HDF5Dcpl dcpl;
HDF5DataType data_type(msg.image.GetMode());
xpixel = msg.image.GetWidth();
ypixel = msg.image.GetHeight();
dcpl.SetCompression(msg.image.GetCompressionAlgorithm(), JFJochBitShuffleCompressor::DefaultBlockSize);
dcpl.SetChunking( {1, ypixel, xpixel});
H5Pset_fill_time(dcpl.GetID(), H5D_FILL_TIME_NEVER);
H5Pset_alloc_time(dcpl.GetID(), H5D_ALLOC_TIME_INCR);
switch (msg.image.GetMode()) {
case CompressedImageMode::Int8:
dcpl.SetFillValue8(INT8_MIN);
break;
case CompressedImageMode::Int16:
dcpl.SetFillValue16(INT16_MIN);
break;
case CompressedImageMode::Int32:
dcpl.SetFillValue32(INT32_MIN);
break;
default:
break;
}
HDF5Group(*data_file, "/entry/data").NXClass("NXdata");
HDF5DataSpace data_space({1, ypixel, xpixel}, {H5S_UNLIMITED, ypixel, xpixel});
data_set = std::make_unique<HDF5DataSet>(*data_file, "/entry/data/data", data_type, data_space, dcpl);
data_set->SetExtent({images_per_file, ypixel, xpixel});
}
HDF5DataSpace data_space({1, ypixel, xpixel}, {H5S_UNLIMITED, ypixel, xpixel});
data_set = std::make_unique<HDF5DataSet>(*data_file, "/entry/data/data", data_type, data_space, dcpl);
data_set->SetExtent({images_per_file, ypixel, xpixel});
for (auto &p: plugins)
p->OpenFile(*data_file, msg, images_per_file);
}
@@ -186,7 +193,8 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
}
nimages++;
data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(), {image_number, 0, 0});
if (data_set)
data_set->WriteDirectChunk(msg.image.GetCompressed(), msg.image.GetCompressedSize(), {image_number, 0, 0});
for (auto &p: plugins)
p->Write(msg, image_number);
+6 -5
View File
@@ -22,12 +22,14 @@ struct HDF5DataFileStatistics {
};
class HDF5DataFile {
std::string filename;
const std::string filename;
const uint64_t file_number;
const bool write_images;
std::string tmp_filename;
std::shared_ptr<HDF5File> data_file = nullptr;
std::unique_ptr<HDF5DataSet> data_set = nullptr;
std::unique_ptr<HDF5DataSet> data_set_image_number = nullptr;
std::vector<std::unique_ptr<HDF5DataFilePlugin>> plugins;
size_t images_per_file;
size_t xpixel;
@@ -46,17 +48,16 @@ class HDF5DataFile {
bool closed = false;
bool overwrite = false;
int64_t file_number;
bool new_file = true;
bool manage_file = false;
public:
HDF5DataFile(const StartMessage &msg, uint64_t file_number);
HDF5DataFile(const StartMessage &msg, uint64_t file_number, const std::string &filename);
~HDF5DataFile();
std::optional<HDF5DataFileStatistics> Close();
void Write(const DataMessage& msg, uint64_t image_number);
size_t GetNumImages() const;
void CreateFile(const DataMessage& msg, std::shared_ptr<HDF5File> data_file, bool integrated = false);
void CreateFile(const DataMessage& msg, std::shared_ptr<HDF5File> data_file);
};
#endif //HDF5DATAFILE_H
+3 -5
View File
@@ -11,15 +11,13 @@
#include "../common/time_utc.h"
#include "gemmi/symmetry.hpp"
namespace {
std::string GenFilename(const StartMessage &start) {
return fmt::format("{:s}_master.h5", start.file_prefix);
}
std::string HDF5Metadata::MasterFileName(const StartMessage &start) {
return fmt::format("{:s}_master.h5", start.file_prefix);
}
NXmx::NXmx(const StartMessage &start)
: start_message(start),
filename(GenFilename(start)) {
filename(HDF5Metadata::MasterFileName(start)) {
uint64_t tmp_suffix;
try {
if (!start.arm_date.empty())
+1
View File
@@ -9,6 +9,7 @@
#include "HDF5Objects.h"
namespace HDF5Metadata {
std::string MasterFileName(const StartMessage &msg);
std::string DataFileName(const StartMessage &msg, int64_t file_number);
}