From ace6a4671734d068e01352785b84d235debeec48 Mon Sep 17 00:00:00 2001 From: Charles Mita Date: Thu, 16 Aug 2018 17:35:27 +0100 Subject: [PATCH] Implement direct-chunk reading to read data when possible The HDF5 library was made thread-safe via excessive locking, so does not gain much from reads being parallelized. By using the H5DOread_chunk function (introduced in HDF5 1.10.2) we reduce the time spent the library, improving performance for when XDS uses multiple threads to process data. The decompression and type conversions have to be done manually however, and this is only used in a limited case. --- Makefile | 7 +- src/file.c | 315 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/file.h | 10 ++ src/filters.c | 58 ++++++++++ src/filters.h | 31 +++++ 5 files changed, 411 insertions(+), 10 deletions(-) create mode 100644 src/filters.c create mode 100644 src/filters.h diff --git a/Makefile b/Makefile index 9e80aa4..8fd50ff 100644 --- a/Makefile +++ b/Makefile @@ -39,14 +39,17 @@ $(BSLZ4_BUILD_DIR)/bitshuffle_core.o $(BSLZ4_BUILD_DIR)/iochain.o mkdir -p $(BUILD_DIR) ar rcs $@ $^ -$(BUILD_DIR)/durin-plugin.so: $(BUILD_DIR)/plugin.o $(BUILD_DIR)/file.o $(BUILD_DIR)/err.o $(BUILD_DIR)/bslz4.a +$(BUILD_DIR)/durin-plugin.so: $(BUILD_DIR)/plugin.o $(BUILD_DIR)/file.o $(BUILD_DIR)/err.o $(BUILD_DIR)/filters.o \ +$(BUILD_DIR)/bslz4.a mkdir -p $(BUILD_DIR) $(CC) $(CFLAGS) -shared $^ -o $(BUILD_DIR)/durin-plugin.so -$(BUILD_DIR)/example: $(BUILD_DIR)/test.o $(BUILD_DIR)/file.o $(BUILD_DIR)/err.o $(BUILD_DIR)/bslz4.a +$(BUILD_DIR)/example: $(BUILD_DIR)/test.o $(BUILD_DIR)/file.o $(BUILD_DIR)/err.o $(BUILD_DIR)/filters.o \ +$(BUILD_DIR)/bslz4.a mkdir -p $(BUILD_DIR) $(CC) $(CFLAGS) $^ -o $(BUILD_DIR)/example .PHONY: clean clean: rm -r $(BUILD_DIR) + rm -r $(BSLZ4_BUILD_DIR) diff --git a/src/file.c b/src/file.c index 38180c7..cc78f59 100644 --- a/src/file.c +++ b/src/file.c @@ -5,12 +5,14 @@ #include +#include #include #include #include #include #include "file.h" #include "err.h" +#include "filters.h" hid_t h5_int_type_from_width(const int width) { if (width == sizeof(char)) { @@ -55,6 +57,11 @@ void free_eiger_data_description(struct data_description_t *desc) { desc->extra = NULL; } +void free_optimised_eiger_data_description(struct data_description_t *desc) { + /* no extra memory is allocated for the optimised struct definition, so call the original */ + free_eiger_data_description(desc); +} + double scale_from_units(const char* unit_string) { if (strcasecmp("m", unit_string) == 0 || @@ -132,10 +139,19 @@ done: return retval; } -int get_frame(hid_t g_id, const char* name, hsize_t *frame_idx, hsize_t *frame_size, int data_width, void *buffer) { +int get_frame_simple(const struct data_description_t *desc, + const char *name, + hsize_t *frame_idx, + hsize_t *frame_size, + int data_width, + void *buffer) { + int retval = 0; herr_t err = 0; - hid_t ds_id, s_id, ms_id, t_id; + hid_t g_id, ds_id, s_id, ms_id, t_id; + + g_id = desc->data_group_id; + ds_id = H5Dopen2(g_id, name, H5P_DEFAULT); if (ds_id <= 0) { char message[64]; @@ -176,6 +192,162 @@ done: return retval; } + +int get_frame_from_chunk(const struct data_description_t *desc, + const char *ds_name, + hsize_t *frame_idx, + hsize_t *frame_size, + int requested_data_width, + void *buffer) { + + hid_t d_id = 0; + hid_t t_id = 0; + hsize_t c_offset[3] = {frame_idx[0], 0, 0}; + uint32_t c_filter_mask = 0; + hsize_t c_bytes; + void *c_buffer = NULL; + void *u_buffer = NULL; + const struct optimised_eiger_data_description_t *eiger_desc = desc->extra; + int retval = 0; + size_t raw_data_width = 0; + + if (frame_idx[1] != 0 || frame_idx[2] != 0) { + char message[64]; + sprintf(message, "Require frame selection starts at [n, 0, 0], not [n, %llu, %llu]", + frame_idx[1], frame_idx[2]); + ERROR_JUMP(-1, done, message); + } + + d_id = H5Dopen(desc->data_group_id, ds_name, H5P_DEFAULT); + if (d_id < 0) { + char message[64]; + sprintf(message, "Error opening dataset %.32s", ds_name); + ERROR_JUMP(-1, done, message); + } + + /* TODO: pass down the dataset's data_width so we don't need to do this */ + t_id = H5Dget_type(d_id); + if (t_id < 0) { + ERROR_JUMP(-1, done, "Error getting datatype"); + } + + raw_data_width = H5Tget_size(t_id); + + if (H5Dget_chunk_storage_size(d_id, c_offset, &c_bytes) < 0) { + char message[96]; + sprintf(message, "Error reading chunk size from %.32s for frame %llu", ds_name, frame_idx[0]); + ERROR_JUMP(-1, done, message); + } + if (c_bytes == 0) { + char message[96]; + sprintf(message, "Target chunk %llu has zero size for dataset %.32s", frame_idx[0], ds_name); + ERROR_JUMP(-1, done, message); + } + + c_buffer = malloc(c_bytes); + if (!c_buffer) { + char message[128]; + sprintf(message, "Unable to allocate chunk buffer for dataset %.32s - frame %llu, size %llu bytes", + ds_name, frame_idx[0], c_bytes); + ERROR_JUMP(-1, done, message); + } + + if (H5DOread_chunk(d_id, H5P_DEFAULT, c_offset, &c_filter_mask, c_buffer) < 0) { + char message[128]; + sprintf(message, "Error reading chunk %llu from dataset %.32s - size %llu bytes", + frame_idx[0], ds_name, c_bytes); + ERROR_JUMP(-1, done, message); + } + + if (raw_data_width == requested_data_width) { + /* can output straight to the target buffer */ + u_buffer = buffer; + } else { + /* must perform a type conversion, so require a second buffer */ + u_buffer = malloc(raw_data_width * frame_size[1] * frame_size[2]); + if (!u_buffer) { + ERROR_JUMP(-1, done, "Unable to allocate output buffer for bslz4 decompression"); + } + } + + if (bslz4_decompress( + eiger_desc->bs_filter_params, + c_bytes, + c_buffer, + raw_data_width * frame_size[1] * frame_size[2], + u_buffer) < 0) { + char message[128]; + sprintf(message, "Error processing chunk %llu from %.32s with bitshuffle_lz4", + frame_idx[0], ds_name); + ERROR_JUMP(-1, done, message); + } + + + if (u_buffer != buffer) { + int could_convert = 1; + /* transfer data to output buffer, performing data conversion as required */ + /* TODO: decide how conversion of data should work + * Should we sign extend? Neggia doesn't (casts from uint*), but may be more intuitive */ + if (requested_data_width == sizeof(int8_t)) { + if (raw_data_width == sizeof(int16_t)) { + CONVERT_BUFFER(u_buffer, int16_t, buffer, int8_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int32_t)) { + CONVERT_BUFFER(u_buffer, int32_t, buffer, int8_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int64_t)) { + CONVERT_BUFFER(u_buffer, int64_t, buffer, int8_t, frame_size[1] * frame_size[2]); + } else { + could_convert = 0; + } + } else if (requested_data_width == sizeof(int16_t)) { + if (raw_data_width == sizeof(int8_t)) { + CONVERT_BUFFER(u_buffer, int8_t, buffer, int16_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int32_t)) { + CONVERT_BUFFER(u_buffer, int32_t, buffer, int16_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int64_t)) { + CONVERT_BUFFER(u_buffer, int64_t, buffer, int16_t, frame_size[1] * frame_size[2]); + } else { + could_convert = 0; + } + } else if (requested_data_width == sizeof(int32_t)) { + if (raw_data_width == sizeof(int8_t)) { + CONVERT_BUFFER(u_buffer, int8_t, buffer, int32_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int16_t)) { + CONVERT_BUFFER(u_buffer, int16_t, buffer, int32_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int64_t)) { + CONVERT_BUFFER(u_buffer, int64_t, buffer, int32_t, frame_size[1] * frame_size[2]); + } else { + could_convert = 0; + } + } else if (requested_data_width == sizeof(int64_t)) { + if (raw_data_width == sizeof(int8_t)) { + CONVERT_BUFFER(u_buffer, int8_t, buffer, int64_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int16_t)) { + CONVERT_BUFFER(u_buffer, int16_t, buffer, int64_t, frame_size[1] * frame_size[2]); + } else if (raw_data_width == sizeof(int32_t)) { + CONVERT_BUFFER(u_buffer, int32_t, buffer, int64_t, frame_size[1] * frame_size[2]); + } else { + could_convert = 0; + } + } else { + could_convert = 0; + } + if (!could_convert) { + char message[128]; + sprintf(message, "Unsupported conversion of data width %lu to %d for %.32s", + raw_data_width, requested_data_width, ds_name); + ERROR_JUMP(-1, done, message); + } + } + +done: + if (c_buffer) free(c_buffer); + if (u_buffer && (u_buffer != buffer)) free(u_buffer); + if (t_id) H5Tclose(t_id); + if (d_id) H5Dclose(d_id); + return retval; +} + + int get_nxs_frame( const struct data_description_t *desc, const struct dataset_properties_t *ds_prop, @@ -192,7 +364,7 @@ int get_nxs_frame( sprintf(message, "Selected frame %d is out of range valid range [0, %d]", n, (int) ds_prop->dims[0] - 1); ERROR_JUMP(-1, done, message); } - retval = get_frame(desc->data_group_id, "data", frame_idx, frame_size, data_width, buffer); + retval = get_frame_simple(desc, "data", frame_idx, frame_size, data_width, buffer); if (retval < 0) { ERROR_JUMP(retval, done, ""); } @@ -228,7 +400,7 @@ int get_dectris_eiger_frame( idx = n - (frame_count - eiger_desc->block_sizes[block]); /* index in current block */ frame_idx[0] = idx; sprintf(data_name, "data_%06d", block + 1); - retval = get_frame(desc->data_group_id, data_name, frame_idx, frame_size, data_width, buffer); + retval = eiger_desc->raw_frame_func(desc, data_name, frame_idx, frame_size, data_width, buffer); if (retval < 0) { ERROR_JUMP(retval, done, ""); } @@ -602,6 +774,103 @@ done: return retval; } +int check_for_chunk_read( + hid_t g_id, + const char* ds_name, + struct optimised_eiger_data_description_t *desc) { + + int retval = 0; + int n_filters; + hsize_t cdims[3]; + hid_t ds_id, dcpl, s_id; + unsigned int filter_flags, filter_config; + char filter_name[16]; + size_t name_len = 16; + size_t cd_nelems = BS_H5_N_PARAMS; + hsize_t dims[3]; + H5Z_filter_t filter; + + dcpl = 0; + s_id = 0; + ds_id = 0; + + ds_id = H5Dopen2(g_id, ds_name, H5P_DEFAULT); + if (ds_id < 0) { + char message[64]; + sprintf(message, "Error opening dataset %.32s", ds_name); + ERROR_JUMP(-1, done, message) + } + + s_id = H5Dget_space(ds_id); + if (s_id < 0) { + char message[64]; + sprintf(message, "Error opening dataspace for %.32s", ds_name); + ERROR_JUMP(-1, done, message); + } + + if (3 != H5Sget_simple_extent_ndims(s_id)) { + goto done; + } + + if (H5Sget_simple_extent_dims(s_id, dims, NULL) < 0) { + char message[80]; + sprintf(message, "Error retriving dataset dimensions for %.32s", ds_name); + ERROR_JUMP(-1, done, message); + } + + dcpl = H5Dget_create_plist(ds_id); + if (dcpl < 0) { + ERROR_JUMP(-1, done, "Error getting dataset creation property list"); + } + + /* check the chunk layout matches the layout we expect of + * [1, frame_size_y, frame_size_x] (1 frame == 1 chunk) */ + int cndims = H5Pget_chunk(dcpl, 3, cdims); + if (cndims != 3) { + goto done; + } + if (cdims[0] != 1 || cdims[1] != dims[1] || cdims[2] != dims[2]) { + goto done; + } + + /* check for potential filters - only the bitshuffle filter is supported */ + n_filters = H5Pget_nfilters(dcpl); + if (n_filters < 0) { + ERROR_JUMP(-1, done, "Error retrieving number of filters on dataset"); + } else if (n_filters > 1) { + goto done; + } + + if (n_filters == 1) { + filter = H5Pget_filter2(dcpl, 0, &filter_flags, + &cd_nelems, desc->bs_filter_params, + name_len, filter_name, + &filter_config); + if (filter < 0) { + ERROR_JUMP(-1, done, "Error retrieving filter information"); + } + if (filter != BS_H5_FILTER_ID) { + goto done; + } + if (cd_nelems > BS_H5_N_PARAMS) { + char message[128]; + sprintf(message, + "More than expected number of parameters to bitshuffle filter - expected %d, was %lu", + BS_H5_N_PARAMS, cd_nelems); + ERROR_JUMP(-1, done, message); + } + } else { + desc->n_filter_params = 0; + } + + retval = 1; + +done: + if (dcpl) H5Pclose(dcpl); + if (s_id) H5Sclose(s_id); + if (ds_id) H5Dclose(ds_id); + return retval; +} int fill_data_descriptor(struct data_description_t *data_desc, struct det_visit_objects_t *visit_result) { int retval = 0; @@ -660,14 +929,42 @@ int fill_data_descriptor(struct data_description_t *data_desc, struct det_visit_ } if (data_desc->get_data_properties == &get_dectris_eiger_dataset_dims) { - /* setup the "extra eiger info" struct */ - struct eiger_data_description_t *eiger_desc = malloc(sizeof(*eiger_desc)); + + /* setup the "extra info" structs */ + struct eiger_data_description_t *eiger_desc; + struct optimised_eiger_data_description_t *o_eiger_desc; + + eiger_desc = malloc(sizeof(*eiger_desc)); if (!eiger_desc) { ERROR_JUMP(-1, done, "Memory error creating data description for Eiger"); } memset(eiger_desc, 0, sizeof(*eiger_desc)); - data_desc->extra = eiger_desc; - data_desc->free_extra = free_eiger_data_description; + eiger_desc->raw_frame_func = &get_frame_simple; + + o_eiger_desc = malloc(sizeof(*o_eiger_desc)); + if (!o_eiger_desc) { + free(eiger_desc); + ERROR_JUMP(-1, done, "Memory error creating data description for optimised Eiger"); + } + o_eiger_desc->base.raw_frame_func = &get_frame_from_chunk; + + /* check if we can perform the optimised chunk read */ + retval = check_for_chunk_read(data_desc->data_group_id, "data_000001", o_eiger_desc); + if (retval < 0) { + free(o_eiger_desc); + free(eiger_desc); + ERROR_JUMP(-1, done, ""); + } + if (retval) { + free(eiger_desc); + data_desc->extra = o_eiger_desc; + data_desc->free_extra = free_optimised_eiger_data_description; + } else { + free(o_eiger_desc); + data_desc->extra = eiger_desc; + data_desc->free_extra = free_eiger_data_description; + } + } else { data_desc->free_extra = free_nxs_data_description; } @@ -702,6 +999,8 @@ int extract_detector_info( if ((retval = data_desc->get_data_properties(data_desc, dataset_prop)) < 0) { ERROR_JUMP(retval, done, ""); } + + done: return retval; } diff --git a/src/file.h b/src/file.h index 9e97116..0733ef6 100644 --- a/src/file.h +++ b/src/file.h @@ -9,6 +9,7 @@ #include #include "err.h" +#include "filters.h" struct dataset_properties_t { int data_width; @@ -31,10 +32,19 @@ void free_nxs_data_description(struct data_description_t *desc); struct eiger_data_description_t { int n_data_blocks; int *block_sizes; + int (*raw_frame_func)(const struct data_description_t*, const char*, hsize_t*, hsize_t*, int, void*); }; void free_eiger_data_description(struct data_description_t *desc); +struct optimised_eiger_data_description_t { + struct eiger_data_description_t base; + int n_filter_params; + unsigned int bs_filter_params[BS_H5_N_PARAMS]; +}; + +void free_optimised_eiger_data_description(struct data_description_t *desc); + struct det_visit_objects_t { hid_t nxdata; hid_t nxdetector; diff --git a/src/filters.c b/src/filters.c new file mode 100644 index 0000000..49369df --- /dev/null +++ b/src/filters.c @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2018 Diamond Light Source Ltd. + * Author: Charles Mita + */ + +#include +#include "filters.h" +#include "err.h" +#include "bitshuffle.h" + +/* Required prototypes from bitshuffle.c but not included in header */ +uint64_t bshuf_read_uint64_BE(const void *buffer); +uint32_t bshuf_read_uint32_BE(const void *buffer); + + +/* + * Derived from the h5 filter code from the bitshuffle project (not included here) + */ +int bslz4_decompress( + const unsigned int* bs_params, + size_t in_size, + void *in_buffer, + size_t out_size, + void *out_buffer) { + + int retval = 0; + size_t size, elem_size, block_size, u_bytes; + + elem_size = bs_params[2]; + u_bytes = bshuf_read_uint64_BE(in_buffer); + + if (u_bytes != out_size) { + char message[64]; + sprintf(message, "Decompressed chunk is %lu bytes, expected %lu", u_bytes, out_size); + ERROR_JUMP(-1, done, message); + } + + block_size = bshuf_read_uint32_BE((const char *) in_buffer + 8) / elem_size; + if (!block_size) { + ERROR_JUMP(-1, done, "Read block bitshuffle lz4 block size as 0"); + } + /* skip over header */ + in_buffer += 12; + size = u_bytes / elem_size; + + if (bs_params[4] == BS_H5_PARAM_LZ4_COMPRESS) { + if (bshuf_decompress_lz4(in_buffer, out_buffer, size, elem_size, block_size) < 0) { + ERROR_JUMP(-1, done, "Error performing bitshuffle_lz4 decompression"); + } + } else { + if (bshuf_bitunshuffle(in_buffer, out_buffer, size, elem_size, block_size) < 0) { + ERROR_JUMP(-1, done, "Error performing bit unshuffle"); + } + } + +done: + return retval; +} diff --git a/src/filters.h b/src/filters.h new file mode 100644 index 0000000..7996783 --- /dev/null +++ b/src/filters.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2018 Diamond Light Source Ltd. + * Author: Charles Mita + */ + +#ifndef NXS_XDS_FILTER_H +#define NXS_XDS_FILTER_H + +#define BS_H5_N_PARAMS 5 +#define BS_H5_FILTER_ID 32008 +#define BS_H5_PARAM_LZ4_COMPRESS 2 + + +/* Perform type conversion during buffer copy */ +#define CONVERT_BUFFER(in, t_in, out, t_out, size) \ +{ \ + t_in *pin = in; \ + t_out *pout = out; \ + t_in *end = pin + size; \ + while (pin < end) *pout++ = (t_out) *pin++; \ +} + + +int bslz4_decompress( + const unsigned int* bs_params, + size_t in_size, + void *in_buffer, + size_t out_size, + void *out_buffer); + +#endif /* NXS_XDS_FILTER_H */