Remove unused core-writer

This commit is contained in:
2020-05-01 10:49:31 +02:00
parent 4a227344b8
commit 0d5d954e67
49 changed files with 250 additions and 19576 deletions
+2
View File
@@ -45,6 +45,8 @@ namespace core_buffer {
// If the RB is empty, how much time to wait before trying to read it again.
const size_t RB_READ_RETRY_INTERVAL_MS = 5;
const size_t LIVE_READ_BLOCK_SIZE = 10;
}
#endif //BUFFERCONFIG_HPP
-20
View File
@@ -1,20 +0,0 @@
add_subdirectory(external/)
file(GLOB SOURCES
src/*.cpp
src/writer/*.cpp
src/receiver/*.cpp
src/module/*.cpp)
add_library(core-writer STATIC ${SOURCES})
include_directories(core-writer external/)
target_include_directories(core-writer PUBLIC include/)
target_include_directories(core-writer PUBLIC external/)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
target_compile_definitions(core-writer PRIVATE DEBUG_OUTPUT)
endif()
enable_testing()
add_subdirectory(test/)
-1
View File
@@ -1 +0,0 @@
<mxfile host="Electron" modified="2020-04-08T10:27:56.987Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.9.9 Chrome/80.0.3987.163 Electron/8.2.1 Safari/537.36" etag="TCN9I0oaoSxg0ELRiQ8L" version="12.9.9" type="device"><diagram id="3znaM6-Qtt049MXpavv6" name="Page-1">7Vxbd6I6FP41fWwWIYTAY9u532d6zsyZ8zILJSozaCzEVs+vP4mAEhIBL1h7KWtRs4kBvn3fSTxDV+P56ySYjj6ykMZnthXOz9CLM9smmIizJCwyArShn1GGSRTmtDXhOvqP5kQrp86ikKZKR85YzKOpSuyzyYT2uUILkoTdqd0GLFbvOg2GVCNc94NYp/6IQj7KqJ5N1vQ3NBqOijtDN3+/cVB0zt8kHQUhuyuR0MszdJUwxrNP4/kVjSV4BS7Z915tuLp6sIROeJsvvPtx8+maw9ep6/bGd/wrQdfJOXSzYW6DeJa/cf60fFFAMEzYbJp3owmncxPwQa/obukPBlevK+SEsjHlyUJ0KQaynOwruYwUI9yt8XYL2qiEte25ADs2sojryzPJ3yXIWT5c3WmNiviQA7MNSPAYINXwpwY6BTmMgWUhz3Ehzs5YQxJiCVrpMOCKIMClUbqC1W9GVYA6CakcxDpDl3ejiNPradCXV++EvRG0ER+Lm76A4uNG9JtRbhJQBWWoyyd0KqgiA6y2Civ2OsIVnTyup4qcruffaJ9Gt3TpdeSDJwIp2RglNAjFh2+XcswJp0nQ5xGbaEgLzLgKZ8oT9odesZglgjJhE9HzchDFcYUUxNFwIpp9KocXBMmBSHimi/zCOApDeRsj/1QOD9iE577VtvZjaYNBQrrJQdhkvK39eYjmP7/zcPgp+D5iLFyMp+/fjwxW5Vs0GV7OBgMBYpU7widP5cf+Io4EXEmzMvQyXD/0VoSg/2e4RPvzjItRaE6Pgx6Nv7A0WoqFgY0fKh04m7bheo9xzsYrHrZkmIHZG3loYx/YpHTgivkjIHO4uduFujm0IHAavAwkNliNIc92RwJh8N7RRIAukBYKcZr62hlvoWcB5JYOUuFtxdIijbeOiDH80uG5BuV2wHoEeSZd8RY3+zo6CS9kIiBZEwdpGvVV/gqIk8U/onFuAWtF+Ck5AYjnFYQX85w3WWtRbn2hSSReSHJ9F/bxIBlS3mzTaKjkKrV+1WRzC1pC44BLr6YkSwbe5Hf4woSHK8nQBpddhO8OAmW9royfslnSp/mQ5XSlchfb8dRhvYrHyFDTBlrK0QqTPURLdyR9Nh5H/NlkqHyBHnDK2u6dtskoLMRBTEbFXOAGY/G47YJDHIA9uD5U1+IjUHYsXoXBbe2CyF9VaVSLB45zXDNht8i2TkScTkZMkBIcqtHlgbwHEjEs9kt/ZJu7dC0zjiYzWTr5tBwL8h1TKaqwF67IQpQE4cT9il5JFYpCg/TJ5Rj1fH1w8UKLMmWjgZ9HfGnfRYKB83Zm4hHx8/baxsvGotSoZhcrd2Gp7gKiI+QqmQGuwYs8yNgF+hC4SlyhGqNquap1FmPZQBFlNakhLvDLRg4d1Q85203+7G4qNk2A7Fym8GxAlBqUZkOIp+Du36MFgUg3GELqr/MmS/iIDdkkiF+uqRVbve7zgcky4dKq/KacL/KqbjDjbIPNUewNrrU2Op9r5aZRd/dUynPkAMtyVwd2FSk4l/NUnk8clJ8LT7OtkgpZQUq5UrkNrlTE/FY6KvxBsCh1m8oO6eZ33eUZ1kKZ3e2w9Q/PEM+kNLl9OPFMvdFrHc+cC7VRfUNFDiFRM1tHN0cnFdJg03y2G0sehtGtwlv3ZibXJywnkM7Tpa25EB2gNZ0vIS+ui09D+T+NRTMfSzzbcrjsyqHuMJ5JtuWD9ZK9xtr0pBX5XssZbJ4gKkv5mY0GXp/2+5pKiCs9T6h5ZXIOdhqY28QGavwBDWstEALlIk5RxrkfSTUV654l9SlKKnGAInLuaQuuqSx4GLG6mVExZLdi9TQVYEBdswKExO9ZR1YAkY+W5+Arc2zIakqAROzsN83BH08d9Irnszo8q0P7kiJEAJVMu721bzgpbTCtWniOah59VIOgD6xyhKJKsecZi4GnGuGYiofPQvwEhBhVTOnWkcn9SDH+a0EtPvj69vdbh/T+/vzu5+evbdZK308FvH6WHAqAPISh6+Rnw0L/FguroSPSLEzEv/yM9ofZaCyQKf5rh3Mt23aH+WSR2l0i65F/hFBZB4bqEUtVM1I7bAlpRtTagKiOXFFLJ5VVYrsZNhu6qgNxugJWL1BGA/ml9JfcKxJNhr80pMWw0TSVkxqr7Qcxm4X3gXqxpg7UzW8gG1ge8aGdn3WPDhE27hhQZB1WAgO/I5Y0lBqy8OwN/iG38nxk4SymLWLNjLTc/vOr2PzTKux7UHNjO0iOCywkBANjFzo+QZWtnFKdfeRYLs7P+l4RkZk7HhSmz5GzZL5nmKnHENiebWFYnHFHooM25zS9gsMX2TByx5fcuJOWswgtIeg9VbnYEOOv1gIDQkpm3rE1sfAtoHyfGIx8B0mCOarfLtA4xv7fYrXzrltRK1u49gfOGKbvHvXX8+FoAVoHQNUtmuswPqvlT+vwzFXrrEXufwL7eY2w6ktYDE793/HNN9q/3TIUSLKNwY8jGNhTODbEArpRb+HrN8T/h1+UpwfuGrtKy3pzNoRBOlrV8HbWt1NZ80qAcCCuAwmEwuH6lQ13q5/l2HY5HbR9IALBNaMbxu16I1+LoO5VlKS8HNY96BjuMOpse2oEpnLR0yOPewrZzEw3edSnFckfQwqELlcYetJSYesLH4tqTRrcdl2sOZCXra3VQGjeUXD8Yo1orn/bKrPl618IQy//Bw==</diagram></mxfile>
-7
View File
@@ -1,7 +0,0 @@
file(GLOB SOURCES
bitshuffle/*.c
bitshuffle/*.cpp)
add_library(external STATIC ${SOURCES})
target_include_directories(external PUBLIC .)
include_directories(/usr/include)
-169
View File
@@ -1,169 +0,0 @@
/*
* Bitshuffle - Filter for improving compression of typed binary data.
*
* Author: Kiyoshi Masui <kiyo@physics.ubc.ca>
* Website: http://www.github.com/kiyo-masui/bitshuffle
* Created: 2014
*
* See LICENSE file for details about copyright and rights to use.
*
*/
#include "bitshuffle.h"
#include "bitshuffle_core.h"
extern "C" {
#include "bitshuffle_internals.h"
#include "lz4.h"
}
#include <stdio.h>
#include <string.h>
// Constants.
// Use fast decompression instead of safe decompression for LZ4.
#define BSHUF_LZ4_DECOMPRESS_FAST
// Macros.
#define CHECK_ERR_FREE_LZ(count, buf) if (count < 0) { \
free(buf); return count - 1000; }
/* Bitshuffle and compress a single block. */
int64_t bshuf_compress_lz4_block(ioc_chain *C_ptr, \
const size_t size, const size_t elem_size) {
int64_t nbytes, count;
void *tmp_buf_bshuf;
void *tmp_buf_lz4;
size_t this_iter;
const void *in;
void *out;
tmp_buf_bshuf = malloc(size * elem_size);
if (tmp_buf_bshuf == NULL) return -1;
tmp_buf_lz4 = malloc(LZ4_compressBound(size * elem_size));
if (tmp_buf_lz4 == NULL){
free(tmp_buf_bshuf);
return -1;
}
in = ioc_get_in(C_ptr, &this_iter);
ioc_set_next_in(C_ptr, &this_iter, (void*) ((char*) in + size * elem_size));
count = bshuf_trans_bit_elem(in, tmp_buf_bshuf, size, elem_size);
if (count < 0) {
free(tmp_buf_lz4);
free(tmp_buf_bshuf);
return count;
}
nbytes = LZ4_compress((const char*) tmp_buf_bshuf, (char*) tmp_buf_lz4, size * elem_size);
free(tmp_buf_bshuf);
CHECK_ERR_FREE_LZ(nbytes, tmp_buf_lz4);
out = ioc_get_out(C_ptr, &this_iter);
ioc_set_next_out(C_ptr, &this_iter, (void *) ((char *) out + nbytes + 4));
bshuf_write_uint32_BE(out, nbytes);
memcpy((char *) out + 4, tmp_buf_lz4, nbytes);
free(tmp_buf_lz4);
return nbytes + 4;
}
/* Decompress and bitunshuffle a single block. */
int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr,
const size_t size, const size_t elem_size) {
int64_t nbytes, count;
void *out, *tmp_buf;
const void *in;
size_t this_iter;
int32_t nbytes_from_header;
in = ioc_get_in(C_ptr, &this_iter);
nbytes_from_header = bshuf_read_uint32_BE(in);
ioc_set_next_in(C_ptr, &this_iter,
(void*) ((char*) in + nbytes_from_header + 4));
out = ioc_get_out(C_ptr, &this_iter);
ioc_set_next_out(C_ptr, &this_iter,
(void *) ((char *) out + size * elem_size));
tmp_buf = malloc(size * elem_size);
if (tmp_buf == NULL) return -1;
#ifdef BSHUF_LZ4_DECOMPRESS_FAST
nbytes = LZ4_decompress_fast((const char*) in + 4, (char*) tmp_buf, size * elem_size);
CHECK_ERR_FREE_LZ(nbytes, tmp_buf);
if (nbytes != nbytes_from_header) {
free(tmp_buf);
return -91;
}
#else
nbytes = LZ4_decompress_safe((const char*) in + 4, (char *) tmp_buf, nbytes_from_header,
size * elem_size);
CHECK_ERR_FREE_LZ(nbytes, tmp_buf);
if (nbytes != size * elem_size) {
free(tmp_buf);
return -91;
}
nbytes = nbytes_from_header;
#endif
count = bshuf_untrans_bit_elem(tmp_buf, out, size, elem_size);
CHECK_ERR_FREE(count, tmp_buf);
nbytes += 4;
free(tmp_buf);
return nbytes;
}
/* ---- Public functions ----
*
* See header file for description and usage.
*
*/
size_t bshuf_compress_lz4_bound(const size_t size,
const size_t elem_size, size_t block_size) {
size_t bound, leftover;
if (block_size == 0) {
block_size = bshuf_default_block_size(elem_size);
}
if (block_size % BSHUF_BLOCKED_MULT) return -81;
// Note that each block gets a 4 byte header.
// Size of full blocks.
bound = (LZ4_compressBound(block_size * elem_size) + 4) * (size / block_size);
// Size of partial blocks, if any.
leftover = ((size % block_size) / BSHUF_BLOCKED_MULT) * BSHUF_BLOCKED_MULT;
if (leftover) bound += LZ4_compressBound(leftover * elem_size) + 4;
// Size of uncompressed data not fitting into any blocks.
bound += (size % BSHUF_BLOCKED_MULT) * elem_size;
return bound;
}
int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_compress_lz4_block, in, out, size,
elem_size, block_size);
}
int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_decompress_lz4_block, in, out, size,
elem_size, block_size);
}
-123
View File
@@ -1,123 +0,0 @@
/*
* Bitshuffle - Filter for improving compression of typed binary data.
*
* This file is part of Bitshuffle
* Author: Kiyoshi Masui <kiyo@physics.ubc.ca>
* Website: http://www.github.com/kiyo-masui/bitshuffle
* Created: 2014
*
* See LICENSE file for details about copyright and rights to use.
*
*
* Header File
*
* Worker routines return an int64_t which is the number of bytes processed
* if positive or an error code if negative.
*
* Error codes:
* -1 : Failed to allocate memory.
* -11 : Missing SSE.
* -12 : Missing AVX.
* -80 : Input size not a multiple of 8.
* -81 : block_size not multiple of 8.
* -91 : Decompression error, wrong number of bytes processed.
* -1YYY : Error internal to compression routine with error code -YYY.
*/
#ifndef BITSHUFFLE_H
#define BITSHUFFLE_H
#include <stdlib.h>
#include "bitshuffle_core.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ---- bshuf_compress_lz4_bound ----
*
* Bound on size of data compressed with *bshuf_compress_lz4*.
*
* Parameters
* ----------
* size : number of elements in input
* elem_size : element size of typed data
* block_size : Process in blocks of this many elements. Pass 0 to
* select automatically (recommended).
*
* Returns
* -------
* Bound on compressed data size.
*
*/
size_t bshuf_compress_lz4_bound(const size_t size,
const size_t elem_size, size_t block_size);
/* ---- bshuf_compress_lz4 ----
*
* Bitshuffled and compress the data using LZ4.
*
* Transpose within elements, in blocks of data of *block_size* elements then
* compress the blocks using LZ4. In the output buffer, each block is prefixed
* by a 4 byte integer giving the compressed size of that block.
*
* Output buffer must be large enough to hold the compressed data. This could
* be in principle substantially larger than the input buffer. Use the routine
* *bshuf_compress_lz4_bound* to get an upper limit.
*
* Parameters
* ----------
* in : input buffer, must be of size * elem_size bytes
* out : output buffer, must be large enough to hold data.
* size : number of elements in input
* elem_size : element size of typed data
* block_size : Process in blocks of this many elements. Pass 0 to
* select automatically (recommended).
*
* Returns
* -------
* number of bytes used in output buffer, negative error-code if failed.
*
*/
int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size, const size_t
elem_size, size_t block_size);
/* ---- bshuf_decompress_lz4 ----
*
* Undo compression and bitshuffling.
*
* Decompress data then un-bitshuffle it in blocks of *block_size* elements.
*
* To properly unshuffle bitshuffled data, *size*, *elem_size* and *block_size*
* must patch the parameters used to compress the data.
*
* NOT TO BE USED WITH UNTRUSTED DATA: This routine uses the function
* LZ4_decompress_fast from LZ4, which does not protect against maliciously
* formed datasets. By modifying the compressed data, this function could be
* coerced into leaving the boundaries of the input buffer.
*
* Parameters
* ----------
* in : input buffer
* out : output buffer, must be of size * elem_size bytes
* size : number of elements in input
* elem_size : element size of typed data
* block_size : Process in blocks of this many elements. Pass 0 to
* select automatically (recommended).
*
* Returns
* -------
* number of bytes consumed in *input* buffer, negative error-code if failed.
*
*/
int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size);
#ifdef __cplusplus
} // extern "C"
#endif
#endif // BITSHUFFLE_H
File diff suppressed because it is too large Load Diff
-156
View File
@@ -1,156 +0,0 @@
/*
* Bitshuffle - Filter for improving compression of typed binary data.
*
* This file is part of Bitshuffle
* Author: Kiyoshi Masui <kiyo@physics.ubc.ca>
* Website: http://www.github.com/kiyo-masui/bitshuffle
* Created: 2014
*
* See LICENSE file for details about copyright and rights to use.
*
*
* Header File
*
* Worker routines return an int64_t which is the number of bytes processed
* if positive or an error code if negative.
*
* Error codes:
* -1 : Failed to allocate memory.
* -11 : Missing SSE.
* -12 : Missing AVX.
* -80 : Input size not a multiple of 8.
* -81 : block_size not multiple of 8.
* -91 : Decompression error, wrong number of bytes processed.
* -1YYY : Error internal to compression routine with error code -YYY.
*/
#ifndef BITSHUFFLE_CORE_H
#define BITSHUFFLE_CORE_H
// We assume GNU g++ defining `__cplusplus` has stdint.h
//#if (defined (__STDC_VERSION__) && __STDC_VERSION__ >= 199900L) || defined(__cplusplus)
#include <stdint.h>
//#else
// typedef unsigned char uint8_t;
// typedef unsigned short uint16_t;
// typedef unsigned int uint32_t;
// typedef signed int int32_t;
// typedef unsigned long long uint64_t;
// typedef long long int64_t;
//#endif
#include <stdlib.h>
// These are usually set in the setup.py.
#ifndef BSHUF_VERSION_MAJOR
#define BSHUF_VERSION_MAJOR 0
#define BSHUF_VERSION_MINOR 3
#define BSHUF_VERSION_POINT 3
#endif
#ifdef __cplusplus
extern "C" {
#endif
/* --- bshuf_using_SSE2 ----
*
* Whether routines where compiled with the SSE2 instruction set.
*
* Returns
* -------
* 1 if using SSE2, 0 otherwise.
*
*/
int bshuf_using_SSE2(void);
/* ---- bshuf_using_AVX2 ----
*
* Whether routines where compiled with the AVX2 instruction set.
*
* Returns
* -------
* 1 if using AVX2, 0 otherwise.
*
*/
int bshuf_using_AVX2(void);
/* ---- bshuf_default_block_size ----
*
* The default block size as function of element size.
*
* This is the block size used by the blocked routines (any routine
* taking a *block_size* argument) when the block_size is not provided
* (zero is passed).
*
* The results of this routine are guaranteed to be stable such that
* shuffled/compressed data can always be decompressed.
*
* Parameters
* ----------
* elem_size : element size of data to be shuffled/compressed.
*
*/
size_t bshuf_default_block_size(const size_t elem_size);
/* ---- bshuf_bitshuffle ----
*
* Bitshuffle the data.
*
* Transpose the bits within elements, in blocks of *block_size*
* elements.
*
* Parameters
* ----------
* in : input buffer, must be of size * elem_size bytes
* out : output buffer, must be of size * elem_size bytes
* size : number of elements in input
* elem_size : element size of typed data
* block_size : Do transpose in blocks of this many elements. Pass 0 to
* select automatically (recommended).
*
* Returns
* -------
* number of bytes processed, negative error-code if failed.
*
*/
int64_t bshuf_bitshuffle(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size);
/* ---- bshuf_bitunshuffle ----
*
* Unshuffle bitshuffled data.
*
* Untranspose the bits within elements, in blocks of *block_size*
* elements.
*
* To properly unshuffle bitshuffled data, *size*, *elem_size* and *block_size*
* must match the parameters used to shuffle the data.
*
* Parameters
* ----------
* in : input buffer, must be of size * elem_size bytes
* out : output buffer, must be of size * elem_size bytes
* size : number of elements in input
* elem_size : element size of typed data
* block_size : Do transpose in blocks of this many elements. Pass 0 to
* select automatically (recommended).
*
* Returns
* -------
* number of bytes processed, negative error-code if failed.
*
*/
int64_t bshuf_bitunshuffle(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size);
#ifdef __cplusplus
} // extern "C"
#endif
#endif // BITSHUFFLE_CORE_H
-75
View File
@@ -1,75 +0,0 @@
/*
* Bitshuffle - Filter for improving compression of typed binary data.
*
* This file is part of Bitshuffle
* Author: Kiyoshi Masui <kiyo@physics.ubc.ca>
* Website: http://www.github.com/kiyo-masui/bitshuffle
* Created: 2014
*
* See LICENSE file for details about copyright and rights to use.
*/
#ifndef BITSHUFFLE_INTERNALS_H
#define BITSHUFFLE_INTERNALS_H
// We assume GNU g++ defining `__cplusplus` has stdint.h
//#if (defined (__STDC_VERSION__) && __STDC_VERSION__ >= 199900L) || defined(__cplusplus)
#include <stdint.h>
//#else
// typedef unsigned char uint8_t;
// typedef unsigned short uint16_t;
// typedef unsigned int uint32_t;
// typedef signed int int32_t;
// typedef unsigned long long uint64_t;
// typedef long long int64_t;
//#endif
#include <stdlib.h>
#include "iochain.h"
// Constants.
#ifndef BSHUF_MIN_RECOMMEND_BLOCK
#define BSHUF_MIN_RECOMMEND_BLOCK 128
#define BSHUF_BLOCKED_MULT 8 // Block sizes must be multiple of this.
#define BSHUF_TARGET_BLOCK_SIZE_B 8192
#endif
// Macros.
#define CHECK_ERR_FREE(count, buf) if (count < 0) { free(buf); return count; }
#ifdef __cplusplus
extern "C" {
#endif
/* ---- Utility functions for internal use only ---- */
int64_t bshuf_trans_bit_elem(const void* in, void* out, const size_t size,
const size_t elem_size);
/* Read a 32 bit unsigned integer from a buffer big endian order. */
uint32_t bshuf_read_uint32_BE(const void* buf);
/* Write a 32 bit unsigned integer to a buffer in big endian order. */
void bshuf_write_uint32_BE(void* buf, uint32_t num);
int64_t bshuf_untrans_bit_elem(const void* in, void* out, const size_t size,
const size_t elem_size);
/* Function definition for worker functions that process a single block. */
typedef int64_t (*bshufBlockFunDef)(ioc_chain* C_ptr,
const size_t size, const size_t elem_size);
/* Wrap a function for processing a single block to process an entire buffer in
* parallel. */
int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out,
const size_t size, const size_t elem_size, size_t block_size);
#ifdef __cplusplus
} // extern "C"
#endif
#endif // BITSHUFFLE_INTERNALS_H
-90
View File
@@ -1,90 +0,0 @@
/*
* IOchain - Distribute a chain of dependant IO events amoung threads.
*
* This file is part of Bitshuffle
* Author: Kiyoshi Masui <kiyo@physics.ubc.ca>
* Website: http://www.github.com/kiyo-masui/bitshuffle
* Created: 2014
*
* See LICENSE file for details about copyright and rights to use.
*
*/
#include <stdlib.h>
#include "iochain.h"
void ioc_init(ioc_chain *C, const void *in_ptr_0, void *out_ptr_0) {
#ifdef _OPENMP
omp_init_lock(&C->next_lock);
for (size_t ii = 0; ii < IOC_SIZE; ii ++) {
omp_init_lock(&(C->in_pl[ii].lock));
omp_init_lock(&(C->out_pl[ii].lock));
}
#endif
C->next = 0;
C->in_pl[0].ptr = in_ptr_0;
C->out_pl[0].ptr = out_ptr_0;
}
void ioc_destroy(ioc_chain *C) {
#ifdef _OPENMP
omp_destroy_lock(&C->next_lock);
for (size_t ii = 0; ii < IOC_SIZE; ii ++) {
omp_destroy_lock(&(C->in_pl[ii].lock));
omp_destroy_lock(&(C->out_pl[ii].lock));
}
#endif
}
const void * ioc_get_in(ioc_chain *C, size_t *this_iter) {
#ifdef _OPENMP
omp_set_lock(&C->next_lock);
#pragma omp flush
#endif
*this_iter = C->next;
C->next ++;
#ifdef _OPENMP
omp_set_lock(&(C->in_pl[*this_iter % IOC_SIZE].lock));
omp_set_lock(&(C->in_pl[(*this_iter + 1) % IOC_SIZE].lock));
omp_set_lock(&(C->out_pl[(*this_iter + 1) % IOC_SIZE].lock));
omp_unset_lock(&C->next_lock);
#endif
return C->in_pl[*this_iter % IOC_SIZE].ptr;
}
void ioc_set_next_in(ioc_chain *C, size_t* this_iter, void* in_ptr) {
C->in_pl[(*this_iter + 1) % IOC_SIZE].ptr = in_ptr;
#ifdef _OPENMP
omp_unset_lock(&(C->in_pl[(*this_iter + 1) % IOC_SIZE].lock));
#endif
}
void * ioc_get_out(ioc_chain *C, size_t *this_iter) {
#ifdef _OPENMP
omp_set_lock(&(C->out_pl[(*this_iter) % IOC_SIZE].lock));
#pragma omp flush
#endif
void *out_ptr = C->out_pl[*this_iter % IOC_SIZE].ptr;
#ifdef _OPENMP
omp_unset_lock(&(C->out_pl[(*this_iter) % IOC_SIZE].lock));
#endif
return out_ptr;
}
void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr) {
C->out_pl[(*this_iter + 1) % IOC_SIZE].ptr = out_ptr;
#ifdef _OPENMP
omp_unset_lock(&(C->out_pl[(*this_iter + 1) % IOC_SIZE].lock));
// *in_pl[this_iter]* lock released at the end of the iteration to avoid being
// overtaken by previous threads and having *out_pl[this_iter]* corrupted.
// Especially worried about thread 0, iteration 0.
omp_unset_lock(&(C->in_pl[(*this_iter) % IOC_SIZE].lock));
#endif
}
-94
View File
@@ -1,94 +0,0 @@
/*
* IOchain - Distribute a chain of dependant IO events amoung threads.
*
* This file is part of Bitshuffle
* Author: Kiyoshi Masui <kiyo@physics.ubc.ca>
* Website: http://www.github.com/kiyo-masui/bitshuffle
* Created: 2014
*
* See LICENSE file for details about copyright and rights to use.
*
*
* Header File
*
* Similar in concept to a queue. Each task includes reading an input
* and writing output, but the location of the input/output (the pointers)
* depend on the previous item in the chain.
*
* This is designed for parallelizing blocked compression/decompression IO,
* where the destination of a compressed block depends on the compressed size
* of all previous blocks.
*
* Implemented with OpenMP locks.
*
*
* Usage
* -----
* - Call `ioc_init` in serial block.
* - Each thread should create a local variable *size_t this_iter* and
* pass its address to all function calls. Its value will be set
* inside the functions and is used to identify the thread.
* - Each thread must call each of the `ioc_get*` and `ioc_set*` methods
* exactly once per iteration, starting with `ioc_get_in` and ending
* with `ioc_set_next_out`.
* - The order (`ioc_get_in`, `ioc_set_next_in`, *work*, `ioc_get_out`,
* `ioc_set_next_out`, *work*) is most efficient.
* - Have each thread call `ioc_end_pop`.
* - `ioc_get_in` is blocked until the previous entry's
* `ioc_set_next_in` is called.
* - `ioc_get_out` is blocked until the previous entry's
* `ioc_set_next_out` is called.
* - There are no blocks on the very first iteration.
* - Call `ioc_destroy` in serial block.
* - Safe for num_threads >= IOC_SIZE (but less efficient).
*
*/
#ifndef IOCHAIN_H
#define IOCHAIN_H
#include <stdlib.h>
#ifdef _OPENMP
#include <omp.h>
#endif
#define IOC_SIZE 33
typedef struct ioc_ptr_and_lock {
#ifdef _OPENMP
omp_lock_t lock;
#endif
void *ptr;
} ptr_and_lock;
typedef struct ioc_const_ptr_and_lock {
#ifdef _OPENMP
omp_lock_t lock;
#endif
const void *ptr;
} const_ptr_and_lock;
typedef struct ioc_chain {
#ifdef _OPENMP
omp_lock_t next_lock;
#endif
size_t next;
const_ptr_and_lock in_pl[IOC_SIZE];
ptr_and_lock out_pl[IOC_SIZE];
} ioc_chain;
void ioc_init(ioc_chain *C, const void *in_ptr_0, void *out_ptr_0);
void ioc_destroy(ioc_chain *C);
const void * ioc_get_in(ioc_chain *C, size_t *this_iter);
void ioc_set_next_in(ioc_chain *C, size_t* this_iter, void* in_ptr);
void * ioc_get_out(ioc_chain *C, size_t *this_iter);
void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr);
#endif // IOCHAIN_H
File diff suppressed because it is too large Load Diff
-360
View File
@@ -1,360 +0,0 @@
/*
LZ4 - Fast LZ compression algorithm
Header File
Copyright (C) 2011-2015, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- LZ4 source repository : https://github.com/Cyan4973/lz4
- LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
*/
#pragma once
#if defined (__cplusplus)
extern "C" {
#endif
/*
* lz4.h provides block compression functions, and gives full buffer control to programmer.
* If you need to generate inter-operable compressed data (respecting LZ4 frame specification),
* and can let the library handle its own memory, please use lz4frame.h instead.
*/
/**************************************
* Version
**************************************/
#define LZ4_VERSION_MAJOR 1 /* for breaking interface changes */
#define LZ4_VERSION_MINOR 7 /* for new (non-breaking) interface capabilities */
#define LZ4_VERSION_RELEASE 1 /* for tweaks, bug-fixes, or development */
#define LZ4_VERSION_NUMBER (LZ4_VERSION_MAJOR *100*100 + LZ4_VERSION_MINOR *100 + LZ4_VERSION_RELEASE)
int LZ4_versionNumber (void);
/**************************************
* Tuning parameter
**************************************/
/*
* LZ4_MEMORY_USAGE :
* Memory usage formula : N->2^N Bytes (examples : 10 -> 1KB; 12 -> 4KB ; 16 -> 64KB; 20 -> 1MB; etc.)
* Increasing memory usage improves compression ratio
* Reduced memory usage can improve speed, due to cache effect
* Default value is 14, for 16KB, which nicely fits into Intel x86 L1 cache
*/
#define LZ4_MEMORY_USAGE 14
/**************************************
* Simple Functions
**************************************/
int LZ4_compress_default(const char* source, char* dest, int sourceSize, int maxDestSize);
int LZ4_decompress_safe (const char* source, char* dest, int compressedSize, int maxDecompressedSize);
/*
LZ4_compress_default() :
Compresses 'sourceSize' bytes from buffer 'source'
into already allocated 'dest' buffer of size 'maxDestSize'.
Compression is guaranteed to succeed if 'maxDestSize' >= LZ4_compressBound(sourceSize).
It also runs faster, so it's a recommended setting.
If the function cannot compress 'source' into a more limited 'dest' budget,
compression stops *immediately*, and the function result is zero.
As a consequence, 'dest' content is not valid.
This function never writes outside 'dest' buffer, nor read outside 'source' buffer.
sourceSize : Max supported value is LZ4_MAX_INPUT_VALUE
maxDestSize : full or partial size of buffer 'dest' (which must be already allocated)
return : the number of bytes written into buffer 'dest' (necessarily <= maxOutputSize)
or 0 if compression fails
LZ4_decompress_safe() :
compressedSize : is the precise full size of the compressed block.
maxDecompressedSize : is the size of destination buffer, which must be already allocated.
return : the number of bytes decompressed into destination buffer (necessarily <= maxDecompressedSize)
If destination buffer is not large enough, decoding will stop and output an error code (<0).
If the source stream is detected malformed, the function will stop decoding and return a negative result.
This function is protected against buffer overflow exploits, including malicious data packets.
It never writes outside output buffer, nor reads outside input buffer.
*/
/**************************************
* Advanced Functions
**************************************/
#define LZ4_MAX_INPUT_SIZE 0x7E000000 /* 2 113 929 216 bytes */
#define LZ4_COMPRESSBOUND(isize) ((unsigned)(isize) > (unsigned)LZ4_MAX_INPUT_SIZE ? 0 : (isize) + ((isize)/255) + 16)
/*
LZ4_compressBound() :
Provides the maximum size that LZ4 compression may output in a "worst case" scenario (input data not compressible)
This function is primarily useful for memory allocation purposes (destination buffer size).
Macro LZ4_COMPRESSBOUND() is also provided for compilation-time evaluation (stack memory allocation for example).
Note that LZ4_compress_default() compress faster when dest buffer size is >= LZ4_compressBound(srcSize)
inputSize : max supported value is LZ4_MAX_INPUT_SIZE
return : maximum output size in a "worst case" scenario
or 0, if input size is too large ( > LZ4_MAX_INPUT_SIZE)
*/
int LZ4_compressBound(int inputSize);
/*
LZ4_compress_fast() :
Same as LZ4_compress_default(), but allows to select an "acceleration" factor.
The larger the acceleration value, the faster the algorithm, but also the lesser the compression.
It's a trade-off. It can be fine tuned, with each successive value providing roughly +~3% to speed.
An acceleration value of "1" is the same as regular LZ4_compress_default()
Values <= 0 will be replaced by ACCELERATION_DEFAULT (see lz4.c), which is 1.
*/
int LZ4_compress_fast (const char* source, char* dest, int sourceSize, int maxDestSize, int acceleration);
/*
LZ4_compress_fast_extState() :
Same compression function, just using an externally allocated memory space to store compression state.
Use LZ4_sizeofState() to know how much memory must be allocated,
and allocate it on 8-bytes boundaries (using malloc() typically).
Then, provide it as 'void* state' to compression function.
*/
int LZ4_sizeofState(void);
int LZ4_compress_fast_extState (void* state, const char* source, char* dest, int inputSize, int maxDestSize, int acceleration);
/*
LZ4_compress_destSize() :
Reverse the logic, by compressing as much data as possible from 'source' buffer
into already allocated buffer 'dest' of size 'targetDestSize'.
This function either compresses the entire 'source' content into 'dest' if it's large enough,
or fill 'dest' buffer completely with as much data as possible from 'source'.
*sourceSizePtr : will be modified to indicate how many bytes where read from 'source' to fill 'dest'.
New value is necessarily <= old value.
return : Nb bytes written into 'dest' (necessarily <= targetDestSize)
or 0 if compression fails
*/
int LZ4_compress_destSize (const char* source, char* dest, int* sourceSizePtr, int targetDestSize);
/*
LZ4_decompress_fast() :
originalSize : is the original and therefore uncompressed size
return : the number of bytes read from the source buffer (in other words, the compressed size)
If the source stream is detected malformed, the function will stop decoding and return a negative result.
Destination buffer must be already allocated. Its size must be a minimum of 'originalSize' bytes.
note : This function fully respect memory boundaries for properly formed compressed data.
It is a bit faster than LZ4_decompress_safe().
However, it does not provide any protection against intentionally modified data stream (malicious input).
Use this function in trusted environment only (data to decode comes from a trusted source).
*/
int LZ4_decompress_fast (const char* source, char* dest, int originalSize);
/*
LZ4_decompress_safe_partial() :
This function decompress a compressed block of size 'compressedSize' at position 'source'
into destination buffer 'dest' of size 'maxDecompressedSize'.
The function tries to stop decompressing operation as soon as 'targetOutputSize' has been reached,
reducing decompression time.
return : the number of bytes decoded in the destination buffer (necessarily <= maxDecompressedSize)
Note : this number can be < 'targetOutputSize' should the compressed block to decode be smaller.
Always control how many bytes were decoded.
If the source stream is detected malformed, the function will stop decoding and return a negative result.
This function never writes outside of output buffer, and never reads outside of input buffer. It is therefore protected against malicious data packets
*/
int LZ4_decompress_safe_partial (const char* source, char* dest, int compressedSize, int targetOutputSize, int maxDecompressedSize);
/***********************************************
* Streaming Compression Functions
***********************************************/
#define LZ4_STREAMSIZE_U64 ((1 << (LZ4_MEMORY_USAGE-3)) + 4)
#define LZ4_STREAMSIZE (LZ4_STREAMSIZE_U64 * sizeof(long long))
/*
* LZ4_stream_t
* information structure to track an LZ4 stream.
* important : init this structure content before first use !
* note : only allocated directly the structure if you are statically linking LZ4
* If you are using liblz4 as a DLL, please use below construction methods instead.
*/
typedef struct { long long table[LZ4_STREAMSIZE_U64]; } LZ4_stream_t;
/*
* LZ4_resetStream
* Use this function to init an allocated LZ4_stream_t structure
*/
void LZ4_resetStream (LZ4_stream_t* streamPtr);
/*
* LZ4_createStream will allocate and initialize an LZ4_stream_t structure
* LZ4_freeStream releases its memory.
* In the context of a DLL (liblz4), please use these methods rather than the static struct.
* They are more future proof, in case of a change of LZ4_stream_t size.
*/
LZ4_stream_t* LZ4_createStream(void);
int LZ4_freeStream (LZ4_stream_t* streamPtr);
/*
* LZ4_loadDict
* Use this function to load a static dictionary into LZ4_stream.
* Any previous data will be forgotten, only 'dictionary' will remain in memory.
* Loading a size of 0 is allowed.
* Return : dictionary size, in bytes (necessarily <= 64 KB)
*/
int LZ4_loadDict (LZ4_stream_t* streamPtr, const char* dictionary, int dictSize);
/*
* LZ4_compress_fast_continue
* Compress buffer content 'src', using data from previously compressed blocks as dictionary to improve compression ratio.
* Important : Previous data blocks are assumed to still be present and unmodified !
* 'dst' buffer must be already allocated.
* If maxDstSize >= LZ4_compressBound(srcSize), compression is guaranteed to succeed, and runs faster.
* If not, and if compressed data cannot fit into 'dst' buffer size, compression stops, and function returns a zero.
*/
int LZ4_compress_fast_continue (LZ4_stream_t* streamPtr, const char* src, char* dst, int srcSize, int maxDstSize, int acceleration);
/*
* LZ4_saveDict
* If previously compressed data block is not guaranteed to remain available at its memory location
* save it into a safer place (char* safeBuffer)
* Note : you don't need to call LZ4_loadDict() afterwards,
* dictionary is immediately usable, you can therefore call LZ4_compress_fast_continue()
* Return : saved dictionary size in bytes (necessarily <= dictSize), or 0 if error
*/
int LZ4_saveDict (LZ4_stream_t* streamPtr, char* safeBuffer, int dictSize);
/************************************************
* Streaming Decompression Functions
************************************************/
#define LZ4_STREAMDECODESIZE_U64 4
#define LZ4_STREAMDECODESIZE (LZ4_STREAMDECODESIZE_U64 * sizeof(unsigned long long))
typedef struct { unsigned long long table[LZ4_STREAMDECODESIZE_U64]; } LZ4_streamDecode_t;
/*
* LZ4_streamDecode_t
* information structure to track an LZ4 stream.
* init this structure content using LZ4_setStreamDecode or memset() before first use !
*
* In the context of a DLL (liblz4) please prefer usage of construction methods below.
* They are more future proof, in case of a change of LZ4_streamDecode_t size in the future.
* LZ4_createStreamDecode will allocate and initialize an LZ4_streamDecode_t structure
* LZ4_freeStreamDecode releases its memory.
*/
LZ4_streamDecode_t* LZ4_createStreamDecode(void);
int LZ4_freeStreamDecode (LZ4_streamDecode_t* LZ4_stream);
/*
* LZ4_setStreamDecode
* Use this function to instruct where to find the dictionary.
* Setting a size of 0 is allowed (same effect as reset).
* Return : 1 if OK, 0 if error
*/
int LZ4_setStreamDecode (LZ4_streamDecode_t* LZ4_streamDecode, const char* dictionary, int dictSize);
/*
*_continue() :
These decoding functions allow decompression of multiple blocks in "streaming" mode.
Previously decoded blocks *must* remain available at the memory position where they were decoded (up to 64 KB)
In the case of a ring buffers, decoding buffer must be either :
- Exactly same size as encoding buffer, with same update rule (block boundaries at same positions)
In which case, the decoding & encoding ring buffer can have any size, including very small ones ( < 64 KB).
- Larger than encoding buffer, by a minimum of maxBlockSize more bytes.
maxBlockSize is implementation dependent. It's the maximum size you intend to compress into a single block.
In which case, encoding and decoding buffers do not need to be synchronized,
and encoding ring buffer can have any size, including small ones ( < 64 KB).
- _At least_ 64 KB + 8 bytes + maxBlockSize.
In which case, encoding and decoding buffers do not need to be synchronized,
and encoding ring buffer can have any size, including larger than decoding buffer.
Whenever these conditions are not possible, save the last 64KB of decoded data into a safe buffer,
and indicate where it is saved using LZ4_setStreamDecode()
*/
int LZ4_decompress_safe_continue (LZ4_streamDecode_t* LZ4_streamDecode, const char* source, char* dest, int compressedSize, int maxDecompressedSize);
int LZ4_decompress_fast_continue (LZ4_streamDecode_t* LZ4_streamDecode, const char* source, char* dest, int originalSize);
/*
Advanced decoding functions :
*_usingDict() :
These decoding functions work the same as
a combination of LZ4_setStreamDecode() followed by LZ4_decompress_x_continue()
They are stand-alone. They don't need nor update an LZ4_streamDecode_t structure.
*/
int LZ4_decompress_safe_usingDict (const char* source, char* dest, int compressedSize, int maxDecompressedSize, const char* dictStart, int dictSize);
int LZ4_decompress_fast_usingDict (const char* source, char* dest, int originalSize, const char* dictStart, int dictSize);
/**************************************
* Obsolete Functions
**************************************/
/* Deprecate Warnings */
/* Should these warnings messages be a problem,
it is generally possible to disable them,
with -Wno-deprecated-declarations for gcc
or _CRT_SECURE_NO_WARNINGS in Visual for example.
You can also define LZ4_DEPRECATE_WARNING_DEFBLOCK. */
#ifndef LZ4_DEPRECATE_WARNING_DEFBLOCK
# define LZ4_DEPRECATE_WARNING_DEFBLOCK
# define LZ4_GCC_VERSION (__GNUC__ * 100 + __GNUC_MINOR__)
# if (LZ4_GCC_VERSION >= 405) || defined(__clang__)
# define LZ4_DEPRECATED(message) __attribute__((deprecated(message)))
# elif (LZ4_GCC_VERSION >= 301)
# define LZ4_DEPRECATED(message) __attribute__((deprecated))
# elif defined(_MSC_VER)
# define LZ4_DEPRECATED(message) __declspec(deprecated(message))
# else
# pragma message("WARNING: You need to implement LZ4_DEPRECATED for this compiler")
# define LZ4_DEPRECATED(message)
# endif
#endif /* LZ4_DEPRECATE_WARNING_DEFBLOCK */
/* Obsolete compression functions */
/* These functions are planned to start generate warnings by r131 approximately */
int LZ4_compress (const char* source, char* dest, int sourceSize);
int LZ4_compress_limitedOutput (const char* source, char* dest, int sourceSize, int maxOutputSize);
int LZ4_compress_withState (void* state, const char* source, char* dest, int inputSize);
int LZ4_compress_limitedOutput_withState (void* state, const char* source, char* dest, int inputSize, int maxOutputSize);
int LZ4_compress_continue (LZ4_stream_t* LZ4_streamPtr, const char* source, char* dest, int inputSize);
int LZ4_compress_limitedOutput_continue (LZ4_stream_t* LZ4_streamPtr, const char* source, char* dest, int inputSize, int maxOutputSize);
/* Obsolete decompression functions */
/* These function names are completely deprecated and must no longer be used.
They are only provided here for compatibility with older programs.
- LZ4_uncompress is the same as LZ4_decompress_fast
- LZ4_uncompress_unknownOutputSize is the same as LZ4_decompress_safe
These function prototypes are now disabled; uncomment them only if you really need them.
It is highly recommended to stop using these prototypes and migrate to maintained ones */
/* int LZ4_uncompress (const char* source, char* dest, int outputSize); */
/* int LZ4_uncompress_unknownOutputSize (const char* source, char* dest, int isize, int maxOutputSize); */
/* Obsolete streaming functions; use new streaming interface whenever possible */
LZ4_DEPRECATED("use LZ4_createStream() instead") void* LZ4_create (char* inputBuffer);
LZ4_DEPRECATED("use LZ4_createStream() instead") int LZ4_sizeofStreamState(void);
LZ4_DEPRECATED("use LZ4_resetStream() instead") int LZ4_resetStreamState(void* state, char* inputBuffer);
LZ4_DEPRECATED("use LZ4_saveDict() instead") char* LZ4_slideInputBuffer (void* state);
/* Obsolete streaming decoding functions */
LZ4_DEPRECATED("use LZ4_decompress_safe_usingDict() instead") int LZ4_decompress_safe_withPrefix64k (const char* src, char* dst, int compressedSize, int maxDstSize);
LZ4_DEPRECATED("use LZ4_decompress_fast_usingDict() instead") int LZ4_decompress_fast_withPrefix64k (const char* src, char* dst, int originalSize);
#if defined (__cplusplus)
}
#endif
-9787
View File
File diff suppressed because it is too large Load Diff
-2604
View File
File diff suppressed because it is too large Load Diff
-29
View File
@@ -1,29 +0,0 @@
#ifndef BUFFEREDWRITER_H
#define BUFFEREDWRITER_H
#include "H5Writer.hpp"
#include "MetadataBuffer.hpp"
class BufferedWriter : public H5Writer
{
const size_t total_frames;
MetadataBuffer& metadata_buffer;
public:
BufferedWriter(
const std::string& filename,
const size_t total_frames,
MetadataBuffer& metadata_buffer,
hsize_t frames_per_file=0,
hsize_t initial_dataset_size=1000,
hsize_t dataset_increase_step=1000);
virtual void cache_metadata(
const std::string& name,
const uint64_t frame_index,
const char* data);
virtual void write_metadata_to_file();
};
#endif
-143
View File
@@ -1,143 +0,0 @@
#ifndef H5FORMAT_H
#define H5FORMAT_H
#include <string>
#include <list>
#include <unordered_map>
#include <H5Cpp.h>
#include <memory>
#include <tuple>
#include <boost/any.hpp>
#include <chrono>
#include "date.h"
typedef boost::any h5_value;
enum NODE_TYPE
{
EMPTY_ROOT,
ATTRIBUTE,
DATASET,
GROUP
};
enum DATA_TYPE
{
NX_FLOAT,
NX_CHAR,
NX_INT,
NX_DATE_TIME,
NX_NUMBER,
NXnote
};
enum DATA_LOCATION
{
IMMEDIATE,
REFERENCE
};
struct h5_base
{
h5_base(const std::string& name, NODE_TYPE node_type) : name(name), node_type(node_type){};
virtual ~h5_base(){}
std::string name;
NODE_TYPE node_type;
};
struct h5_data_base
{
h5_data_base(DATA_TYPE data_type, DATA_LOCATION data_location) : data_type(data_type), data_location(data_location) {};
virtual ~h5_data_base(){}
DATA_TYPE data_type;
DATA_LOCATION data_location;
};
struct h5_parent: public h5_base
{
h5_parent(const std::string& name, NODE_TYPE node_type, const std::list<std::shared_ptr<h5_base>>& items) :
h5_base(name, node_type), items(items) {};
std::list<std::shared_ptr<h5_base>> items;
};
struct h5_group : public h5_parent
{
h5_group(const std::string& name, const std::list<std::shared_ptr<h5_base>>& items={}) :
h5_parent(name, GROUP, items) {};
};
struct h5_dataset : public h5_parent, public h5_data_base
{
h5_dataset(const std::string& name, const std::string& value, DATA_TYPE data_type, const std::list<std::shared_ptr<h5_base>>& items={})
: h5_parent(name, DATASET, items), h5_data_base(data_type, REFERENCE), value(value) {};
std::string value;
};
struct h5_attr : public h5_base, public h5_data_base
{
h5_attr(const std::string& name, const h5_value& value, DATA_TYPE data_types, DATA_LOCATION data_location=IMMEDIATE)
: h5_base(name, ATTRIBUTE), h5_data_base(data_types, data_location), value(value){};
h5_value value;
};
class H5Format
{
public:
virtual ~H5Format(){};
virtual const std::unordered_map<std::string, DATA_TYPE>& get_input_value_type() const = 0;
virtual const std::unordered_map<std::string, boost::any>& get_default_values() const = 0;
virtual const h5_parent& get_format_definition() const = 0;
virtual void add_calculated_values(std::unordered_map<std::string, boost::any>& values) const = 0;
virtual void add_input_values(std::unordered_map<std::string, boost::any>& values,
const std::unordered_map<std::string, boost::any>& input_values) const = 0;
virtual const std::unordered_map<std::string, std::string>& get_dataset_move_mapping() const = 0;
};
namespace H5FormatUtils
{
hsize_t expand_dataset(H5::DataSet& dataset, hsize_t frame_index, hsize_t dataset_increase_step);
void compact_dataset(H5::DataSet& dataset, hsize_t max_frame_index);
H5::Group create_group(H5::Group& target, const std::string& name);
const H5::PredType& get_dataset_data_type(const std::string& type);
H5::DataSet write_dataset(H5::Group& target, const h5_dataset& dataset,
const std::unordered_map<std::string, boost::any>& values);
H5::DataSet write_dataset(H5::Group& target, const std::string& name, double value);
H5::DataSet write_dataset(H5::Group& target, const std::string& name, int value);
H5::DataSet write_dataset(H5::Group& target, const std::string& name, const std::string& value);
void write_attribute(H5::H5Object& target, const h5_attr& attribute,
const std::unordered_map<std::string, boost::any>& values);
void write_attribute(H5::H5Object& target, const std::string& name, const std::string& value);
void write_attribute(H5::H5Object& target, const std::string& name, int value);
const boost::any& get_value_from_reference(const std::string& dataset_name,
const boost::any& value_reference, const std::unordered_map<std::string, boost::any>& values);
void write_format_data(H5::Group& file_node, const h5_parent& format_node,
const std::unordered_map<std::string, h5_value>& values);
void write_format(H5::H5File& file, const H5Format& format,
const std::unordered_map<std::string, h5_value>& input_values);
};
#endif
-44
View File
@@ -1,44 +0,0 @@
#ifndef H5WRITERMODULE_H
#define H5WRITERMODULE_H
#include <thread>
#include "RingBuffer.hpp"
#include "ZmqReceiver.hpp"
#include "H5Format.hpp"
class H5WriteModule {
typedef std::unordered_map<std::string, HeaderDataType> header_map;
RingBuffer<FrameMetadata>& ring_buffer_;
const header_map& header_values_;
const H5Format& format_;
std::atomic_bool is_writing_;
std::thread writing_thread_;
protected:
void write_thread(
const std::string& output_file,
const size_t n_frames,
const int user_id);
public:
H5WriteModule(
RingBuffer<FrameMetadata>& ring_buffer,
const header_map& header_values,
const H5Format& format);
virtual ~H5WriteModule();
void start_writing(
const std::string& output_file,
const size_t n_frames = 0,
const int user_id = -1
);
void stop_writing();
bool is_writing();
};
#endif //H5WRITERMODULE_H
-50
View File
@@ -1,50 +0,0 @@
#ifndef H5WRITER_H
#define H5WRITER_H
#include <unordered_map>
#include <memory>
#include <vector>
#include <H5Cpp.h>
#include <chrono>
#include "date.h"
class H5Writer
{
protected:
// Initialized in constructor.
std::string filename_;
hsize_t frames_per_file;
hsize_t initial_dataset_size;
hsize_t dataset_increase_step = 0;
// State variables.
hsize_t max_data_index = 0;
hsize_t current_frame_chunk = 0;
H5::H5File file;
std::unordered_map<std::string, H5::DataSet> datasets;
std::unordered_map<std::string, hsize_t> datasets_current_size;
hsize_t prepare_storage_for_data(const std::string& dataset_name, const size_t data_index, const std::vector<size_t>& data_shape,
const std::string& data_type, const std::string& endianness);
void create_dataset(const std::string& dataset_name, const std::vector<size_t>& data_shape,
const std::string& data_type, const std::string& endianness, bool chunked, hsize_t dataset_size);
size_t get_relative_data_index(const size_t data_index);
public:
H5Writer(const std::string& filename, hsize_t frames_per_file=0, hsize_t initial_dataset_size=1000, hsize_t dataset_increase_step=1000);
virtual ~H5Writer();
virtual bool is_file_open() const;
virtual void create_file(const hsize_t frame_chunk=1);
virtual void create_file(const std::string& filename, const hsize_t frame_chunk=1);
virtual void close_file();
virtual void write_data(const std::string& dataset_name, const size_t data_index, const char* data, const std::vector<size_t>& data_shape,
const size_t data_bytes_size, const std::string& data_type, const std::string& endianness);
virtual H5::H5File& get_h5_file();
virtual bool is_data_for_current_file(const size_t data_index);
};
#endif
-35
View File
@@ -1,35 +0,0 @@
#ifndef METADATABUFFER_H
#define METADATABUFFER_H
#include <unordered_map>
#include <string>
#include "ZmqReceiver.hpp"
class MetadataBuffer
{
typedef std::unordered_map<std::string, HeaderDataType> header_map;
const uint64_t n_slots_;
const header_map& header_values_type_;
protected:
std::unordered_map<std::string, std::shared_ptr<char>> metadata_buffer;
std::unordered_map<std::string, size_t> metadata_length_bytes;
public:
MetadataBuffer(
const uint64_t n_slots,
const header_map& header_values_type);
void add_metadata_to_buffer(
const std::string& name,
const uint64_t frame_index,
const char* data);
std::shared_ptr<char> get_metadata_values(std::string name);
const header_map& get_header_values_type();
uint64_t get_n_slots();
};
#endif
-35
View File
@@ -1,35 +0,0 @@
#ifndef PROCESSMANAGER_H
#define PROCESSMANAGER_H
#include "H5WriteModule.hpp"
#include "ZmqRecvModule.hpp"
class ProcessManager
{
H5WriteModule& write_module_;
ZmqRecvModule& recv_module_;
public:
ProcessManager(H5WriteModule& write_module,
ZmqRecvModule& recv_module);
void start_rest_api(const uint16_t rest_port);
void start_writing(
const std::string& output_file,
const int n_frames,
const int user_id);
void stop_writing();
void start_receiving(
const std::string& connect_address,
const int n_receiving_threads);
void stop_receiving();
std::string get_status();
std::unordered_map<std::string, float> get_statistics();
};
#endif
-14
View File
@@ -1,14 +0,0 @@
#ifndef RESTAPI_H
#define RESTAPI_H
#include <chrono>
#include "date.h"
#include "ProcessManager.hpp"
namespace RestApi
{
void start_rest_api(ProcessManager& manager, uint16_t port);
}
#endif
-71
View File
@@ -1,71 +0,0 @@
#ifndef ZMQRECEIVER_H
#define ZMQRECEIVER_H
#include <string>
#include <memory>
#include <tuple>
#include <zmq.hpp>
#include <vector>
#include <memory>
#include <unordered_map>
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include "date.h"
#include "RingBuffer.hpp"
#include "config.hpp"
struct HeaderDataType
{
std::string type;
size_t value_shape;
std::string endianness;
size_t value_bytes_size;
bool is_array;
HeaderDataType(const std::string& type);
HeaderDataType(const std::string& type, size_t shape);
};
size_t get_type_byte_size(const std::string& type);
void copy_value_to_buffer(
const char* buffer, size_t offset,
const boost::property_tree::ptree& json_value,
const HeaderDataType& header_data_type);
std::shared_ptr<char> get_value_from_json(
const boost::property_tree::ptree& json_header,
const std::string& name,
const HeaderDataType& header_data_type);
class ZmqReceiver
{
typedef std::unordered_map<std::string, HeaderDataType> header_map;
const header_map& header_values_type_;
zmq::context_t context_;
zmq::socket_t socket_;
zmq::message_t message_header_;
zmq::message_t message_data_;
boost::property_tree::ptree json_header;
public:
ZmqReceiver(
const header_map& header_values_type,
const int n_io_threads=config::zmq_n_io_threads
);
void connect(
const std::string& connect_address,
const int receive_timeout=config::zmq_receive_timeout);
void disconnect();
std::shared_ptr<FrameMetadata> read_json_header(const std::string& header);
std::pair<std::shared_ptr<FrameMetadata>, char*> receive();
};
#endif
-38
View File
@@ -1,38 +0,0 @@
#ifndef ZMQRECVMODULE_H
#define ZMQRECVMODULE_H
#include <thread>
#include "ZmqReceiver.hpp"
#include "RingBuffer.hpp"
class ZmqRecvModule
{
typedef std::unordered_map<std::string, HeaderDataType> header_map;
RingBuffer<FrameMetadata>& ring_buffer_;
const header_map& header_values_;
std::atomic_bool is_receiving_;
std::atomic_bool is_saving_;
std::vector<std::thread> receiving_threads_;
protected:
void receive_thread(
const std::string& connect_address);
public:
ZmqRecvModule(
RingBuffer<FrameMetadata>& ring_buffer,
const header_map& header_values);
virtual ~ZmqRecvModule();
void start_recv(const std::string& connect_address,
const uint8_t n_receiving_threads);
void stop_recv();
bool is_receiving();
void start_saving();
void stop_saving_and_clear_buffer();
};
#endif
-36
View File
@@ -1,36 +0,0 @@
#ifndef COMPRESSION_H
#define COMPRESSION_H
#include <cstddef>
#include <cstdint>
namespace compression {
size_t get_lz4_max_buffer_size(size_t n_elements, size_t element_size);
size_t compress_lz4(const char* data,
size_t n_elements,
size_t element_size,
char* buffer,
size_t buffer_size);
size_t decompress_lz4(const char* compressed_data,
size_t compressed_size,
char* data);
size_t get_bitshuffle_max_buffer_size(size_t n_elements,
size_t element_size);
size_t compress_bitshuffle(const char* data,
size_t n_elements,
size_t element_size,
char* buffer);
size_t decompress_bitshuffle(const char* compressed_data,
size_t compressed_size,
size_t n_elements,
size_t element_size,
char* data);
};
#endif
-27
View File
@@ -1,27 +0,0 @@
#include <H5Cpp.h>
#include <string>
#ifndef CONFIG_H
#define CONFIG_H
namespace config
{
extern int zmq_n_io_threads;
extern int zmq_receive_timeout;
extern int zmq_buffer_size_header;
extern int zmq_buffer_size_data;
extern size_t ring_buffer_n_slots;
extern uint32_t ring_buffer_read_retry_interval;
extern hsize_t dataset_increase_step;
extern hsize_t initial_dataset_size;
extern std::string raw_image_dataset_name;
extern uint32_t parameters_read_retry_interval;
extern size_t recv_saving_wait_ms;
extern size_t udp_usec_timeout;
}
#endif
-127
View File
@@ -1,127 +0,0 @@
#include <iostream>
#include "RestApi.hpp"
#include "ProcessManager.hpp"
using namespace std;
ProcessManager::ProcessManager(
H5WriteModule& write_module,
ZmqRecvModule& recv_module) :
write_module_(write_module),
recv_module_(recv_module)
{
}
void ProcessManager::start_rest_api(const uint16_t rest_port)
{
RestApi::start_rest_api(*this, rest_port);
// In case SIGINT stopped the rest_api.
stop_receiving();
stop_writing();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ProcessManager::start_rest_api]";
cout << " Server stopped." << endl;
#endif
}
void ProcessManager::start_writing(
const string& output_file,
const int n_frames,
const int user_id)
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ProcessManager::start_writing]";
cout << " output_file " << output_file;
cout << " n_frames " << n_frames;
cout << " user_id " << user_id << endl;
#endif
if (!recv_module_.is_receiving()) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "] ";
err_msg << "[ProcessManager::start_writing]";
err_msg << " Cannot start writing. ";
err_msg << " Must start receiving first." << endl;
throw runtime_error(err_msg.str());
}
recv_module_.stop_saving_and_clear_buffer();
write_module_.start_writing(output_file, n_frames, user_id);
recv_module_.start_saving();
}
void ProcessManager::stop_writing()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ProcessManager::stop_writing]" << endl;
#endif
write_module_.stop_writing();
recv_module_.stop_saving_and_clear_buffer();
}
void ProcessManager::start_receiving(
const string& connect_address,
const int n_receiving_threads)
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ProcessManager::start_receiving]" << endl;
cout << " connect_address " << connect_address;
cout << " n_receiving_threads " << n_receiving_threads;
#endif
recv_module_.start_recv(
connect_address,
static_cast<uint8_t>(n_receiving_threads));
}
void ProcessManager::stop_receiving()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ProcessManager::stop_receiving]" << endl;
#endif
recv_module_.stop_recv();
}
string ProcessManager::get_status()
{
if (write_module_.is_writing()) {
return "writing";
}
if (recv_module_.is_receiving()) {
return "ready";
}
return "idle";
}
unordered_map<std::string, float> ProcessManager::get_statistics()
{
// TODO: Implement statistics.
return {};
}
-119
View File
@@ -1,119 +0,0 @@
#include <sstream>
#include <string>
#include "crow_all.h"
#include "RestApi.hpp"
using namespace std;
void RestApi::start_rest_api(ProcessManager& manager, uint16_t port)
{
#ifdef DEBUG_OUTPUT
cout << "[RestApi::start_rest_api]";
cout << " Starting rest interface on port ";
cout << static_cast<int>(port) << endl;
#endif
crow::SimpleApp app;
CROW_ROUTE (app, "/writing").methods("POST"_method, "DELETE"_method)
([&](const crow::request& req)
{
if (req.method == "POST"_method) {
auto request_parameters = crow::json::load(req.body);
string output_file =
request_parameters["output_file"].s();
int n_frames =
request_parameters["n_frames"].i();
int user_id =
request_parameters["user_id"].i();
manager.start_writing(output_file, n_frames, user_id);
crow::json::wvalue result;
result["state"] = "ok";
result["status"] = manager.get_status();
result["message"] = "Writing started.";
return result;
}
if (req.method == "DELETE"_method) {
manager.stop_writing();
crow::json::wvalue result;
result["state"] = "ok";
result["status"] = manager.get_status();
result["message"] = "Writing stopped.";
return result;
}
throw runtime_error("You should not see this.");
});
CROW_ROUTE (app, "/receiving").methods("POST"_method, "DELETE"_method)
([&](const crow::request& req)
{
if (req.method == "POST"_method) {
auto request_parameters = crow::json::load(req.body);
string url =
request_parameters["connect_address"].s();
int n_threads =
request_parameters["n_receiving_threads"].i();
manager.start_receiving(url, n_threads);
crow::json::wvalue result;
result["state"] = "ok";
result["status"] = manager.get_status();
result["message"] = "Receiving started.";
return result;
}
if (req.method == "DELETE"_method) {
manager.stop_receiving();
crow::json::wvalue result;
result["state"] = "ok";
result["status"] = manager.get_status();
result["message"] = "Receiving stopped.";
return result;
}
throw runtime_error("You should not see this.");
});
CROW_ROUTE (app, "/status")
([&](){
crow::json::wvalue result;
result["state"] = "ok";
result["status"] = manager.get_status();
return result;
});
CROW_ROUTE (app, "/statistics")
([&](){
crow::json::wvalue result;
for (const auto& item : manager.get_statistics()) {
result[item.first] = item.second;
}
result["state"] = "ok";
result["status"] = manager.get_status();
return result;
});
app.loglevel(crow::LogLevel::ERROR);
app.port(port).run();
}
-31
View File
@@ -1,31 +0,0 @@
#include "config.hpp"
namespace config {
// Number of receiving threads. Roughly 1 thread / (GB/s)
int zmq_n_io_threads = 1;
int zmq_receive_timeout = 100;
// JSON header buffer size - 1MB.
int zmq_buffer_size_header = 1024 * 1024 * 1;
// Data message buffer size - 15MB.
int zmq_buffer_size_data = 1024 * 1024 * 15;
// Ring buffer config.
// Allow for a couple of seconds (file creation might be slow).
size_t ring_buffer_n_slots = 1000;
// Delay before trying again to get data from the ring buffer.
uint32_t ring_buffer_read_retry_interval = 5;
std::string raw_image_dataset_name = "raw_data";
// By how much to enlarge a dataset when a resizing is needed.
hsize_t dataset_increase_step = 1000;
// To which value to initialize a dataset size.
hsize_t initial_dataset_size = 1000;
// Max time for compress and commit already acquired RB slots.
size_t recv_saving_wait_ms = 100;
// Microseconds before the udp socket times out.
size_t udp_usec_timeout = 10 * 1000;
}
-264
View File
@@ -1,264 +0,0 @@
#include "H5WriteModule.hpp"
#include <iostream>
#include <MetadataBuffer.hpp>
#include "WriterUtils.hpp"
#include "BufferedWriter.hpp"
using namespace std;
H5WriteModule::H5WriteModule(
RingBuffer<FrameMetadata>& ring_buffer,
const header_map& header_values,
const H5Format& format) :
ring_buffer_(ring_buffer),
header_values_(header_values),
format_(format),
is_writing_(false)
{
}
H5WriteModule::~H5WriteModule()
{
stop_writing();
}
void H5WriteModule::start_writing(
const string& output_file,
const size_t n_frames,
const int user_id)
{
if (is_writing_ == true) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[H5WriteModule::start_writing]";
err_msg << " Writer already running." << endl;
throw runtime_error(err_msg.str());
}
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::start_writing]";
cout << " Start writing with parameters:" << endl;
cout << "\toutput_file: " << output_file;
cout << "\tn_frames: " << n_frames;
cout << "\tuser_id: " << user_id;
cout << endl;
#endif
if (writing_thread_.joinable()) {
writing_thread_.join();
}
is_writing_ = true;
writing_thread_ = thread(
&H5WriteModule::write_thread, this,
output_file,
n_frames,
user_id);
}
void H5WriteModule::stop_writing()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::stop_writing]";
cout << " Disable writing." << endl;
#endif
is_writing_ = false;
if (writing_thread_.joinable()) {
writing_thread_.join();
}
}
void H5WriteModule::write_thread(
const std::string& output_file,
const size_t n_frames,
const int user_id)
{
try {
if (user_id != -1) {
WriterUtils::set_process_effective_id(user_id);
}
WriterUtils::create_destination_folder(output_file);
MetadataBuffer metadata_buffer(n_frames, header_values_);
BufferedWriter writer(output_file, n_frames, metadata_buffer);
writer.create_file();
auto raw_frames_dataset_name = config::raw_image_dataset_name;
size_t n_written_frames = 0;
while(is_writing_.load(memory_order_relaxed)) {
auto received_data = ring_buffer_.read();
// .first is nullptr if ringbuffer is empty.
if(received_data.first == nullptr) {
this_thread::sleep_for(chrono::milliseconds(
config::ring_buffer_read_retry_interval));
continue;
}
// Write file format before rolling to next file.
if (!writer.is_data_for_current_file(
received_data.first->frame_index)) {
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread] Frame index ";
cout << received_data.first->frame_index;
cout << " does not belong to current file. ";
cout << " Write format before switching file." << endl;
#endif
writer.write_metadata_to_file();
H5FormatUtils::write_format(
writer.get_h5_file(),
format_,
{});
}
#ifdef PERF_OUTPUT
using namespace date;
using namespace chrono;
auto start_time_frame = system_clock::now();
#endif
// Write image data.
writer.write_data(
raw_frames_dataset_name,
received_data.first->frame_index,
received_data.second,
received_data.first->frame_shape,
received_data.first->frame_bytes_size,
received_data.first->type,
received_data.first->endianness);
#ifdef PERF_OUTPUT
using namespace date;
using namespace chrono;
auto frame_time_difference = system_clock::now() - start_time_frame;
auto frame_diff_ms =
duration<float, milli>(frame_time_difference).count();
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread] Frame index ";
cout << received_data.first->frame_index;
cout << " written in " << frame_diff_ms << " ms." << endl;
#endif
ring_buffer_.release(received_data.first->buffer_slot_index);
#ifdef PERF_OUTPUT
using namespace date;
using namespace chrono;
auto start_time_metadata = system_clock::now();
#endif
if (!header_values_.empty()) {
for (const auto &header_type : header_values_) {
auto &name = header_type.first;
auto value = received_data.first->header_values.at(name);
writer.cache_metadata(
name,
received_data.first->frame_index,
value.get());
}
}
n_written_frames++;
if (n_written_frames == n_frames) {
is_writing_ = false;
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread]";
cout << " Written all n_frames " << n_written_frames;
cout << endl;
#endif
}
#ifdef PERF_OUTPUT
using namespace date;
using namespace chrono;
auto metadata_time_difference = system_clock::now() - start_time_metadata;
auto metadata_diff_ms = duration<float, milli>(metadata_time_difference).count();
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread] Frame metadata index ";
cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl;
#endif
}
if (writer.is_file_open()) {
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread]";
cout << " Writing file format." << endl;
#endif
writer.write_metadata_to_file();
H5FormatUtils::write_format(writer.get_h5_file(), format_, {});
writer.close_file();
}
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread]";
cout << " Writer thread stopped." << endl;
#endif
} catch (const std::exception& e) {
is_writing_ = false;
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[H5WriteModule::write_thread]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
throw;
}
is_writing_ = false;
}
bool H5WriteModule::is_writing()
{
return is_writing_;
}
-250
View File
@@ -1,250 +0,0 @@
#include <config.hpp>
#include <iostream>
#include <compression.hpp>
#include "ZmqRecvModule.hpp"
using namespace std;
ZmqRecvModule::ZmqRecvModule(
RingBuffer<FrameMetadata> &ring_buffer,
const header_map &header_values) :
ring_buffer_(ring_buffer),
header_values_(header_values),
is_receiving_(false),
is_saving_(false)
{}
ZmqRecvModule::~ZmqRecvModule()
{
stop_recv();
}
void ZmqRecvModule::start_recv(
const string& connect_address,
const uint8_t n_receiving_threads)
{
if (is_receiving_ == true) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ZmqRecvModule::start_recv]";
err_msg << " Receivers already running." << endl;
throw runtime_error(err_msg.str());
}
if (n_receiving_threads < 1) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ZmqRecvModule::start_recv]";
err_msg << " n_receiving_threads ";
err_msg << n_receiving_threads << " must be > 1." << endl;
throw runtime_error(err_msg.str());
}
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::start]";
cout << " Starting with ";
cout << "connect_address " << connect_address;
cout << " n_receiving_thread ";
cout << (int) n_receiving_threads << endl;
#endif
// TODO: Join threads if joinable and not yet joined.
is_receiving_ = true;
for (uint8_t i_rec=0; i_rec < n_receiving_threads; i_rec++) {
receiving_threads_.emplace_back(
&ZmqRecvModule::receive_thread, this, connect_address);
}
}
void ZmqRecvModule::stop_recv()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::stop_recv]";
cout << " Stop receiving threads." << endl;
#endif
is_receiving_ = false;
for (auto& recv_thread:receiving_threads_) {
if (recv_thread.joinable()) {
recv_thread.join();
}
}
receiving_threads_.clear();
}
bool ZmqRecvModule::is_receiving()
{
return is_receiving_;
}
void ZmqRecvModule::start_saving()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::start_saving]";
cout << " Enable saving." << endl;
#endif
is_saving_ = true;
}
void ZmqRecvModule::stop_saving_and_clear_buffer()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::stop_saving]";
cout << " Disable saving." << endl;
#endif
is_saving_ = false;
// TODO: Solve the problem differently - control to RB?
this_thread::sleep_for(chrono::milliseconds(
config::recv_saving_wait_ms));
ring_buffer_.clear();
}
void ZmqRecvModule::receive_thread(const string& connect_address)
{
try {
ZmqReceiver receiver(header_values_);
receiver.connect(connect_address);
bool rb_initialized(false);
while (is_receiving_.load(memory_order_relaxed)) {
auto frame = receiver.receive();
// .first and .second = nullptr when no message received
if (frame.first == nullptr ) {
continue;
}
if (!is_saving_) {
continue;
}
auto frame_metadata = frame.first;
auto frame_data = frame.second;
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Processing FrameMetadata with frame_index ";
cout << frame_metadata->frame_index;
cout << " and frame_shape [" << frame_metadata->frame_shape[0];
cout << ", " << frame_metadata->frame_shape[1] << "]";
cout << " and endianness " << frame_metadata->endianness;
cout << " and type " << frame_metadata->type;
cout << " and frame_bytes_size ";
cout << frame_metadata->frame_bytes_size << "." << endl;
#endif
if (!rb_initialized) {
size_t n_elements =
frame_metadata->frame_shape[0] *
frame_metadata->frame_shape[1];
size_t max_buffer_size =
compression::get_bitshuffle_max_buffer_size(
n_elements,
frame_metadata->frame_bytes_size/n_elements);
ring_buffer_.initialize(max_buffer_size);
rb_initialized = true;
}
char* buffer = ring_buffer_.reserve(frame_metadata);
if (buffer == nullptr) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[UdpRecvModule::receive_thread]";
err_msg << " Ring buffer is full.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
memcpy(
buffer,
static_cast<const char*>(frame_data),
frame_metadata->frame_bytes_size);
// auto compressed_size = compression::compress_bitshuffle(
// static_cast<const char*>(frame_data),
// frame_metadata->frame_bytes_size,
// 1,
// buffer);
//
// #ifdef DEBUG_OUTPUT
// using namespace date;
// using namespace chrono;
// cout << "[" << system_clock::now() << "]";
// cout << "[ZmqRecvModule::receive_thread]";
// cout << " Compressed image from ";
// cout << frame_metadata->frame_bytes_size << " bytes to ";
// cout << compressed_size << " bytes." << endl;
// #endif
// frame_metadata->frame_bytes_size = compressed_size;
ring_buffer_.commit(frame_metadata);
}
receiver.disconnect();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Receiver thread stopped." << endl;
#endif
} catch (const std::exception& e) {
is_receiving_ = false;
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
throw;
}
}
-302
View File
@@ -1,302 +0,0 @@
#include <iostream>
#include <stdexcept>
#include "config.hpp"
#include "ZmqReceiver.hpp"
#include "H5Format.hpp"
using namespace std;
namespace pt = boost::property_tree;
HeaderDataType::HeaderDataType(const std::string& type, size_t shape) :
type(type),
value_shape(shape),
endianness("little"),
is_array(true)
{
value_bytes_size = get_type_byte_size(type);
}
HeaderDataType::HeaderDataType(const std::string& type) :
type(type),
value_shape(1),
endianness("little"),
is_array(false)
{
value_bytes_size = get_type_byte_size(type);
}
size_t get_type_byte_size(const string& type)
{
if (type == "uint8" || type== "int8") {
return 1;
} else if (type == "uint16" || type == "int16") {
return 2;
} else if (type == "uint32" || type == "int32" || type == "float32") {
return 4;
} else if (type == "uint64" || type == "int64" || type == "float64") {
return 8;
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[ZmqReceiver::get_type_byte_size]";
error_message << " Unsupported data type " << type << endl;
throw runtime_error(error_message.str());
}
}
ZmqReceiver::ZmqReceiver(
const header_map& header_values_type,
const int n_io_threads) :
header_values_type_(header_values_type),
context_(n_io_threads),
socket_(context_, ZMQ_PULL),
message_header_(config::zmq_buffer_size_header),
message_data_(config::zmq_buffer_size_data)
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::ZmqReceiver]";
cout << " Creating ZMQ receiver with";
cout << " n_io_threads " << n_io_threads;
cout << endl;
#endif
}
void ZmqReceiver::connect(
const string& connect_address,
const int receive_timeout)
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::connect]";
cout << " Connecting to address " << connect_address;
cout << " with receive timeout " << receive_timeout << endl;
#endif
socket_.set(zmq::sockopt::rcvtimeo, receive_timeout);
socket_.connect(connect_address);
}
void ZmqReceiver::disconnect()
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::disconnect]";
cout << " Disconnect." << endl;
#endif
socket_.close();
context_.close();
}
pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
{
if (!socket_.connected()) {
stringstream error_message;
using namespace date;
using namespace chrono;
error_message << "[" << system_clock::now() << "]";
error_message << "[ZmqReceiver::receive]";
error_message << " Cannot receive before connecting.";
error_message << " Connect first." << endl;
throw runtime_error(error_message.str());
}
// Get the message header.
auto recv_n_bytes_header = socket_.recv(message_header_);
if (!recv_n_bytes_header.has_value()){
return {nullptr, nullptr};
}
auto header_string = string(
static_cast<char*>(message_header_.data()),
message_header_.size());
auto frame_metadata = read_json_header(header_string);
// Get the message data.
auto recv_n_bytes_data = socket_.recv(message_data_);
if (!recv_n_bytes_data.has_value()) {
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::receive]";
cout << " Error while reading from ZMQ.";
cout << " Frame index " << frame_metadata->frame_index << " lost.";
cout << " Trying to continue with the next frame." << endl;
return {nullptr, nullptr};
}
frame_metadata->frame_bytes_size = message_data_.size();
return {frame_metadata, static_cast<char*>(message_data_.data())};
}
shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
{
try {
stringstream header_stream;
header_stream << header << endl;
pt::read_json(header_stream, json_header);
auto header_data = make_shared<FrameMetadata>();
header_data->frame_index = json_header.get<uint64_t>("frame");
for (const auto& item : json_header.get_child("shape")) {
header_data->frame_shape.push_back(item.second.get_value<size_t>());
}
// Array 1.0 specified little endian as the default encoding.
header_data->endianness = json_header.get("endianness", "little");
header_data->type = json_header.get<string>("type");
if (!header_values_type_.empty()) {
for (const auto &value_mapping : header_values_type_) {
const auto &name = value_mapping.first;
const auto &header_data_type = value_mapping.second;
auto value = get_value_from_json(
json_header, name, header_data_type);
header_data->header_values.insert(
{name, value}
);
}
}
return header_data;
} catch (...) {
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::read_json_header]";
cout << " Error while interpreting the JSON header. ";
cout << " Header string: " << header << endl;
cout << "Expected JSON header format: " << endl;
if (!header_values_type_.empty()) {
for (const auto &value_mapping : header_values_type_) {
cout << "\t" << value_mapping.first << ":";
cout << value_mapping.second.type;
cout << "[" << value_mapping.second.value_shape << "]" << endl;
}
}
throw;
}
}
void copy_value_to_buffer(
char* buffer,
const size_t offset,
const pt::ptree& json_value,
const HeaderDataType& header_data_type)
{
if (header_data_type.type == "uint8") {
auto value = json_value.get_value<uint8_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint16") {
auto value = json_value.get_value<uint16_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint32") {
auto value = json_value.get_value<uint32_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint64") {
auto value = json_value.get_value<uint64_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "int8") {
auto value = json_value.get_value<int8_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "int16") {
auto value = json_value.get_value<int16_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "int32") {
auto value = json_value.get_value<int32_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "int64") {
auto value = json_value.get_value<int64_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "float32") {
auto value = json_value.get_value<float>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else if (header_data_type.type == "float64") {
auto value = json_value.get_value<double>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value),
header_data_type.value_bytes_size);
} else {
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[ZmqReceiver::get_value_from_json]";
error_message << " Unsupported header data type ";
error_message << header_data_type.type << endl;
throw runtime_error(error_message.str());
}
}
shared_ptr<char> get_value_from_json(
const pt::ptree& json_header,
const string& name,
const HeaderDataType& header_data_type)
{
char* buffer = new char[
header_data_type.value_bytes_size * header_data_type.value_shape];
if (header_data_type.is_array) {
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto offset = index * header_data_type.value_bytes_size;
copy_value_to_buffer(buffer, offset, item.second, header_data_type);
++index;
}
} else {
copy_value_to_buffer(
buffer, 0, json_header.get_child(name), header_data_type);
}
return shared_ptr<char>(buffer, default_delete<char[]>());
}
-138
View File
@@ -1,138 +0,0 @@
#include "compression.hpp"
#include "bitshuffle/bitshuffle.h"
#include <stdexcept>
#include <arpa/inet.h>
extern "C" {
#include "bitshuffle/lz4.h"
}
using namespace std;
#define is_little_endian htonl(1) != 1
size_t compression::get_lz4_max_buffer_size(size_t n_elements,
size_t element_size)
{
size_t n_bytes = n_elements * element_size;
return static_cast<size_t>(LZ4_compressBound(n_bytes)) + 4;
}
size_t compression::compress_lz4(const char* data,
size_t n_elements,
size_t element_size,
char* buffer,
size_t buffer_size)
{
size_t data_len = n_elements * element_size;
// The bytes should be in big endian (network order).
if(is_little_endian) {
((uint32_t*)buffer)[0] = htonl(data_len);
} else {
((uint32_t*)buffer)[0] = data_len;
}
size_t compressed_size = LZ4_compress_default(data, &buffer[4], data_len, buffer_size-4);
if(!compressed_size) {
throw runtime_error("Error while compressing [LZ4] channel:");
}
return compressed_size+4;
}
size_t compression::decompress_lz4(const char* compressed_data, size_t compressed_size, char* data) {
uint32_t expected_data_size;
if(is_little_endian) {
expected_data_size = ntohl(((uint32_t*)compressed_data)[0]);
} else {
expected_data_size = ((uint32_t*)compressed_data)[0];
}
// 4 bytes of our header.
compressed_size -= 4;
int decompressed_size = LZ4_decompress_safe(&compressed_data[4], data, compressed_size, expected_data_size);
if (expected_data_size != decompressed_size) {
throw runtime_error("Expected and decompressed data len do not match.");
}
// If the value is not positive, we throw an exception anyway.
return (size_t) decompressed_size;
}
size_t compression::get_bitshuffle_max_buffer_size(
size_t n_elements,
size_t element_size)
{
return bshuf_compress_lz4_bound(n_elements, element_size, 0) + 12;
}
size_t compression::compress_bitshuffle(const char* data, size_t n_elements, size_t element_size, char* buffer){
size_t block_size = bshuf_default_block_size(element_size);
uint64_t uncompressed_data_len = (uint64_t) n_elements * element_size;
// The block size has to be multiplied by the elm_size before inserting it into the binary header.
// https://github.com/kiyo-masui/bitshuffle/blob/04e58bd553304ec26e222654f1d9b6ff64e97d10/src/bshuf_h5filter.c#L167
uint32_t header_block_size = (uint32_t) block_size * element_size;
// The system is little endian, convert values to big endian (network order).
if (is_little_endian) {
uint32_t high_bytes = htonl((uint32_t)(uncompressed_data_len >> 32));
uint32_t low_bytes = htonl((uint32_t)(uncompressed_data_len & 0xFFFFFFFFLL));
uncompressed_data_len = (((uint64_t)low_bytes) << 32) | high_bytes;
header_block_size = htonl(header_block_size);
}
((int64_t*)buffer)[0] = uncompressed_data_len;
((int32_t*)buffer)[2] = header_block_size;
auto compressed_size = bshuf_compress_lz4(data, &buffer[12], n_elements, element_size, block_size);
if (compressed_size <= 0) {
throw runtime_error("Error while compressing [LZ4] channel:");
}
// If the value is not positive, we throw an exception anyway.
return (size_t)compressed_size+12;
}
size_t compression::decompress_bitshuffle(const char* compressed_data, size_t compressed_size,
size_t n_elements, size_t element_size, char* data) {
uint64_t header_expected_data_size = ((uint64_t*)compressed_data)[0];
uint32_t header_block_size = ((uint32_t*)compressed_data)[2];
size_t expected_data_size = header_expected_data_size;
if (is_little_endian) {
uint32_t high_bytes = ntohl((uint32_t)(header_expected_data_size & 0xFFFFFFFFLL));
uint32_t low_bytes = ntohl((uint32_t)(header_expected_data_size >> 32));
expected_data_size = (((uint64_t)high_bytes) << 32) | low_bytes;
header_block_size = ntohl(header_block_size);
}
// The block size has to be multiplied by the elm_size before inserting it into the binary header.
// https://github.com/kiyo-masui/bitshuffle/blob/04e58bd553304ec26e222654f1d9b6ff64e97d10/src/bshuf_h5filter.c#L167
size_t block_size = header_block_size / element_size;
auto n_processed_bytes = bshuf_decompress_lz4(&compressed_data[12], data, n_elements, element_size, block_size);
// 12 bytes of our header.
compressed_size -= 12;
if (compressed_size != n_processed_bytes) {
throw runtime_error("Compressed and processed data len do not match.");
}
return expected_data_size;
}
-77
View File
@@ -1,77 +0,0 @@
#include <iostream>
#include "H5Format.hpp"
#include "BufferedWriter.hpp"
using namespace std;
BufferedWriter::BufferedWriter(
const std::string& filename,
size_t total_frames,
MetadataBuffer& metadata_buffer,
hsize_t frames_per_file,
hsize_t initial_dataset_size,
hsize_t dataset_increase_step) :
H5Writer(
filename,
frames_per_file,
initial_dataset_size,
dataset_increase_step),
total_frames(total_frames),
metadata_buffer(metadata_buffer)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[BufferedWriter::BufferedWriter] Creating buffered writer";
cout << " with filename " << filename;
cout << " and total_frames " << total_frames;
cout << " and frames_per_file " << frames_per_file;
cout << " and initial_dataset_size " << initial_dataset_size;
cout << endl;
#endif
}
void BufferedWriter::cache_metadata(
const string& name,
const uint64_t frame_index,
const char* data)
{
auto relative_frame_index =
get_relative_data_index(static_cast<const size_t>(frame_index));
metadata_buffer.add_metadata_to_buffer(name, relative_frame_index, data);
}
void BufferedWriter::write_metadata_to_file()
{
auto header_values_type = metadata_buffer.get_header_values_type();
if (!header_values_type.empty()) {
for (const auto &header_type : header_values_type) {
auto &dataset_name = header_type.first;
auto &header_data_type = header_type.second;
vector<size_t> data_shape = {header_data_type.value_shape};
create_dataset(
dataset_name,
data_shape,
header_data_type.type,
header_data_type.endianness,
false,
metadata_buffer.get_n_slots());
H5::AtomType dataset_data_type(
H5FormatUtils::get_dataset_data_type(
header_data_type.type));
dataset_data_type.setOrder(H5T_ORDER_LE);
auto &dataset = datasets.at(dataset_name);
dataset.write(
metadata_buffer.get_metadata_values(dataset_name).get(),
dataset_data_type);
}
}
}
-391
View File
@@ -1,391 +0,0 @@
#include <string>
#include <sstream>
#include <stdexcept>
#include <iostream>
#include "config.hpp"
#include "H5Format.hpp"
using namespace std;
hsize_t H5FormatUtils::expand_dataset(H5::DataSet& dataset, hsize_t frame_index, hsize_t dataset_increase_step)
{
const auto& data_space = dataset.getSpace();
int dataset_rank = data_space.getSimpleExtentNdims();
hsize_t dataset_dimension[dataset_rank];
data_space.getSimpleExtentDims(dataset_dimension);
dataset_dimension[0] = frame_index + dataset_increase_step;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::expand_dataset] Expanding dataspace to size (";
for (int i=0; i<dataset_rank; ++i) {
cout << dataset_dimension[i] << ",";
}
cout << ")" << endl;
#endif
dataset.extend(dataset_dimension);
return dataset_dimension[0];
}
void H5FormatUtils::compact_dataset(H5::DataSet& dataset, hsize_t max_frame_index)
{
// Only chunked datasets can be resized.
if (H5D_CHUNKED != dataset.getCreatePlist().getLayout()) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::compact_dataset] Not compact a contiguous dataset." << endl;
#endif
return;
}
const auto& data_space = dataset.getSpace();
int dataset_rank = data_space.getSimpleExtentNdims();
hsize_t dataset_dimension[dataset_rank];
data_space.getSimpleExtentDims(dataset_dimension);
dataset_dimension[0] = max_frame_index + 1;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::compact_dataset] Compacting dataspace to size (";
for (int i=0; i<dataset_rank; ++i) {
cout << dataset_dimension[i] << ",";
}
cout << ")" << endl;
#endif
dataset.extend(dataset_dimension);
}
H5::Group H5FormatUtils::create_group(H5::Group& target, const string& name)
{
return target.createGroup(name.c_str());
}
const boost::any& H5FormatUtils::get_value_from_reference(const string& dataset_name,
const boost::any& value_reference, const unordered_map<string, boost::any>& values)
{
try {
auto reference_string = boost::any_cast<string>(value_reference);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::get_value_from_reference] Getting dataset '"<< dataset_name;
cout << "' reference value '" << reference_string << "'." << endl;
#endif
return values.at(reference_string);
} catch (const boost::bad_any_cast& exception) {
stringstream error_message;
using namespace date;
error_message << "[" << chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << dataset_name << " value reference to string." << endl;
throw runtime_error(error_message.str());
} catch (const out_of_range& exception){
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Dataset " << dataset_name << " value reference " << boost::any_cast<string>(value_reference);
error_message << " not present in values map." << endl;
throw runtime_error(error_message.str());
}
}
const H5::PredType& H5FormatUtils::get_dataset_data_type(const string& type)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::get_dataset_data_type] Getting dataset type for received frame type " << type << endl;
#endif
if (type == "uint8") {
return H5::PredType::NATIVE_UINT8;
} else if (type == "uint16") {
return H5::PredType::NATIVE_UINT16;
} else if (type == "uint32") {
return H5::PredType::NATIVE_UINT32;
} else if (type == "uint64") {
return H5::PredType::NATIVE_UINT64;
} else if (type == "int8") {
return H5::PredType::NATIVE_INT8;
} else if (type == "int16") {
return H5::PredType::NATIVE_INT16;
} else if (type == "int32") {
return H5::PredType::NATIVE_INT32;
} else if (type == "int64") {
return H5::PredType::NATIVE_INT64;
} else {
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[H5FormatUtils::get_dataset_data_type] Unsupported dataset data_type " << type << endl;
throw runtime_error(error_message.str());
}
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const h5_dataset& dataset,
const unordered_map<string, boost::any>& values)
{
const string& name = dataset.name;
boost::any value;
// Value is stored directly in the struct.
if (dataset.data_location == IMMEDIATE){
value = dataset.value;
// Value in struct is just a string reference to into the values map.
} else {
value = H5FormatUtils::get_value_from_reference(name, dataset.value, values);
}
if (dataset.data_type == NX_CHAR || dataset.data_type == NX_DATE_TIME || dataset.data_type == NXnote) {
// Attempt to convert to const char * (string "literals" cause that).
try {
return H5FormatUtils::write_dataset(target, name, string(boost::any_cast<const char*>(value)));
} catch (const boost::bad_any_cast& exception) {}
// Atempt to convert to string.
try {
return H5FormatUtils::write_dataset(target, name, boost::any_cast<string>(value));
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << name << " to string or const char*." << endl;
throw runtime_error(error_message.str());
} else if (dataset.data_type == NX_INT) {
try {
return H5FormatUtils::write_dataset(target, name, boost::any_cast<int>(value));
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << name << " to NX_INT." << endl;
throw runtime_error(error_message.str());
} else if (dataset.data_type == NX_FLOAT || dataset.data_type == NX_NUMBER) {
try {
return H5FormatUtils::write_dataset(target, name, boost::any_cast<double>(value));
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << name << " to NX_FLOAT." << endl;
throw runtime_error(error_message.str());
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Unsupported dataset type for dataset " << name << "." << endl;
throw runtime_error(error_message.str());
}
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const string& name, double value)
{
H5::DataSpace att_space(H5S_SCALAR);
auto data_type = H5::PredType::NATIVE_DOUBLE;
H5::DataSet dataset = target.createDataSet(name.c_str(), data_type , att_space);
dataset.write(&value, data_type);
return dataset;
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const string& name, int value)
{
H5::DataSpace att_space(H5S_SCALAR);
auto data_type = H5::PredType::NATIVE_INT;
H5::DataSet dataset = target.createDataSet(name.c_str(), data_type, att_space);
dataset.write(&value, data_type);
return dataset;
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const string& name, const string& value)
{
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(0, H5T_VARIABLE);
H5::DataSet dataset = target.createDataSet(name.c_str(), data_type ,att_space);
dataset.write(value, data_type);
return dataset;
}
void H5FormatUtils::write_attribute(H5::H5Object& target, const string& name, const string& value)
{
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(H5::PredType::C_S1, H5T_VARIABLE);
auto h5_attribute = target.createAttribute(name.c_str(), data_type, att_space);
h5_attribute.write(data_type, value.c_str());
}
void H5FormatUtils::write_attribute(H5::H5Object& target, const string& name, int value)
{
H5::DataSpace att_space(H5S_SCALAR);
auto data_type = H5::PredType::NATIVE_INT;
auto h5_attribute = target.createAttribute(name.c_str(), data_type, att_space);
h5_attribute.write(data_type, &value);
}
void H5FormatUtils::write_attribute(H5::H5Object& target, const h5_attr& attribute,
const unordered_map<string, boost::any>& values)
{
string name = attribute.name;
boost::any value;
// Value is stored directly in the struct.
if (attribute.data_location == IMMEDIATE){
value = attribute.value;
// Value in struct is just a string reference to into the values map.
} else {
value = H5FormatUtils::get_value_from_reference(name, attribute.value, values);
}
if (attribute.data_type == NX_CHAR) {
// Attempt to convert to const char * (string "literals" cause that).
try {
H5FormatUtils::write_attribute(target, name, string(boost::any_cast<const char*>(value)));
return;
} catch (const boost::bad_any_cast& exception) {}
// Atempt to convert to string.
try {
H5FormatUtils::write_attribute(target, name, boost::any_cast<string>(value));
return;
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert attribute " << name << " to string or const char*." << endl;
throw runtime_error(error_message.str());
} else if (attribute.data_type == NX_INT) {
try {
H5FormatUtils::write_attribute(target, name, boost::any_cast<int>(value));
return;
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert attribute " << name << " to INT." << endl;
throw runtime_error(error_message.str());
}
}
void H5FormatUtils::write_format_data(H5::Group& file_node, const h5_parent& format_node,
const std::unordered_map<std::string, h5_value>& values)
{
auto process_items = [&format_node, &values](H5::Group& node_group){
for (const auto item_ptr : format_node.items) {
const h5_base& item = *item_ptr;
if (item.node_type == GROUP) {
auto sub_group = dynamic_cast<const h5_group&>(item);
write_format_data(node_group, sub_group, values);
} else if (item.node_type == ATTRIBUTE) {
auto sub_attribute = dynamic_cast<const h5_attr&>(item);
H5FormatUtils::write_attribute(node_group, sub_attribute, values);
} else if (item.node_type == DATASET) {
auto sub_dataset = dynamic_cast<const h5_dataset&>(item);
auto current_dataset = H5FormatUtils::write_dataset(node_group, sub_dataset, values);
for (const auto dataset_attr_ptr : sub_dataset.items) {
const h5_base& dataset_attr = *dataset_attr_ptr;
// You can specify only attributes inside a dataset.
if (dataset_attr.node_type != ATTRIBUTE) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Invalid element " << dataset_attr.name << " on dataset " << sub_dataset.name << ". Only attributes allowd.";
throw invalid_argument( error_message.str() );
}
auto sub_attribute = dynamic_cast<const h5_attr&>(dataset_attr);
H5FormatUtils::write_attribute(current_dataset, sub_attribute, values);
}
}
}
};
if (format_node.node_type == GROUP) {
auto x = H5FormatUtils::create_group(file_node, format_node.name);
process_items(x);
}else {
process_items(file_node);
}
}
void H5FormatUtils::write_format(H5::H5File& file, const H5Format& format,
const std::unordered_map<std::string, h5_value>& input_values)
{
auto format_definition = format.get_format_definition();
auto default_values = format.get_default_values();
auto format_values(default_values);
format.add_input_values(format_values, input_values);
format.add_calculated_values(format_values);
write_format_data(file, format_definition, format_values);
for (const auto& mapping : format.get_dataset_move_mapping()) {
file.move(mapping.first, mapping.second);
}
}
-320
View File
@@ -1,320 +0,0 @@
#include <sstream>
#include <stdexcept>
#include <iostream>
#include "H5Writer.hpp"
#include "H5Format.hpp"
extern "C"
{
#include "H5DOpublic.h"
}
using namespace std;
H5Writer::H5Writer(const std::string& filename, hsize_t frames_per_file, hsize_t initial_dataset_size,
hsize_t dataset_increase_step) :
filename_(filename), frames_per_file(frames_per_file),
initial_dataset_size(initial_dataset_size), dataset_increase_step(dataset_increase_step)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::H5Writer] Creating chunked writer";
cout << " with filename " << filename;
cout << " and frames_per_file " << frames_per_file;
cout << " and initial_dataset_size " << initial_dataset_size;
cout << endl;
#endif
}
H5Writer::~H5Writer()
{
close_file();
}
void H5Writer::close_file()
{
if (is_file_open()) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::close_file] Closing file." << endl;
#endif
hsize_t min_frame_in_dataset = 0;
if (frames_per_file) {
min_frame_in_dataset = (current_frame_chunk - 1) * frames_per_file;
}
// max_data_index is relative to the current file.
hsize_t max_frame_in_dataset = max_data_index + min_frame_in_dataset;
// Frame indexing starts at 1 (for some reason).
auto image_nr_low = min_frame_in_dataset + 1;
auto image_nr_high = max_frame_in_dataset + 1;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::close_file] Setting datasets attribute image_nr_low=" << image_nr_low;
cout << " and image_nr_high=" << image_nr_high << endl;
#endif
for (const auto& dataset_map : datasets) {
auto dataset = dataset_map.second;
H5FormatUtils::compact_dataset(dataset, max_data_index);
H5FormatUtils::write_attribute(dataset, "image_nr_low", image_nr_low);
H5FormatUtils::write_attribute(dataset, "image_nr_high", image_nr_high);
}
file.close();
} else {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::close_file] Trying to close an already closed file." << endl;
#endif
}
// Cleanup.
datasets.clear();
datasets_current_size.clear();
current_frame_chunk = 0;
max_data_index = 0;
}
void H5Writer::write_data(const string& dataset_name, const size_t data_index, const char* data,
const std::vector<size_t>& data_shape, const size_t data_bytes_size, const string& data_type, const string& endianness)
{
try {
// Define the ofset of the currently received image in the file.
hsize_t relative_data_index = prepare_storage_for_data(dataset_name, data_index, data_shape, data_type, endianness);
// Define the offset where to write the data.
size_t data_rank = data_shape.size();
hsize_t offset[data_rank+1];
offset[0] = relative_data_index;
for (uint index=0; index<data_rank; ++index) {
offset[index+1] = 0;
}
// No compression for now.
uint32_t filters = 0;
const auto& dataset = datasets.at(dataset_name);
if( H5DOwrite_chunk(dataset.getId(), H5P_DEFAULT, filters, offset, data_bytes_size, data) )
{
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Error while writing dataset " << dataset_name << " chunk to file at offset ";
error_message << relative_data_index << "." << endl;
throw invalid_argument( error_message.str() );
}
} catch (...) {
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::write_data] Error while trying to write data to dataset " << dataset_name << endl;
throw;
}
}
void H5Writer::create_dataset(const string& dataset_name, const vector<size_t>& data_shape,
const string& data_type, const string& endianness, bool chunked, hsize_t dataset_size)
{
// Number of dimensions in each data point.
const size_t data_rank = data_shape.size();
// The +1 dimension is to account for the sequence of data points (time).
const hsize_t dataset_rank = data_rank + 1;
hsize_t dataset_dimension[dataset_rank];
hsize_t max_dataset_dimension[dataset_rank];
hsize_t dataset_chunking[dataset_rank];
// This should be equivalent to the total number of frames in this file.
dataset_dimension[0] = dataset_size;
// The maximum dataset size is the same as the number of images.
max_dataset_dimension[0] = dataset_size;
// Chunking is always set to a single data point.
dataset_chunking[0] = 1;
for (size_t index=0; index<data_rank; ++index) {
dataset_dimension[index+1] = data_shape[index];
max_dataset_dimension[index+1] = data_shape[index];
dataset_chunking[index+1] = data_shape[index];
}
// Create a chunked dataset if needed.
H5::DSetCreatPropList dataset_properties;
if (chunked) {
dataset_properties.setChunk(dataset_rank, dataset_chunking);
// Chunked datasets can be resized without limits.
max_dataset_dimension[0] = H5S_UNLIMITED;
}
H5::DataSpace dataspace(dataset_rank, dataset_dimension, max_dataset_dimension);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::create_dataset] Creating dataspace of size (";
for (hsize_t i=0; i<dataset_rank; ++i) {
cout << dataset_dimension[i] << ",";
}
cout << ")" << endl;
#endif
H5::AtomType dataset_data_type(H5FormatUtils::get_dataset_data_type(data_type));
if (endianness == "big") {
dataset_data_type.setOrder(H5T_ORDER_BE);
} else {
dataset_data_type.setOrder(H5T_ORDER_LE);
}
auto dataset = file.createDataSet(dataset_name.c_str(), dataset_data_type, dataspace, dataset_properties);
datasets.insert({dataset_name, dataset});
datasets_current_size.insert({dataset_name, initial_dataset_size});
}
void H5Writer::create_file(const string& filename, const hsize_t frame_chunk)
{
filename_ = filename;
create_file(frame_chunk);
}
void H5Writer::create_file(hsize_t frame_chunk)
{
if (file.getId() != -1) {
close_file();
}
auto target_filename = filename_;
// In case frames_per_file is > 0, the filename variable is a template for the filename.
if (frames_per_file) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::create_file] Frames per file is defined. Format " << filename_ << " with frame_chunk " << frame_chunk << endl;
#endif
// Space for 10 digits should be enough.
char buffer[filename_.length() + 10];
sprintf(buffer, filename_.c_str(), frame_chunk);
target_filename = string(buffer);
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::create_file] Creating filename " << target_filename << endl;
#endif
file = H5::H5File(target_filename.c_str(), H5F_ACC_TRUNC);
if (file.getId() == -1) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot create new file with filename " << target_filename << endl;
throw runtime_error(error_message.str());
}
// New file created - set this files chunk number.
current_frame_chunk = frame_chunk;
}
bool H5Writer::is_file_open() const
{
return (file.getId() != -1);
}
size_t H5Writer::get_relative_data_index(const size_t data_index)
{
// No file roll over.
if (frames_per_file == 0) {
return data_index;
}
size_t destination_file_index = data_index / frames_per_file;
size_t relative_data_index = data_index - (destination_file_index * frames_per_file);
return relative_data_index;
}
inline bool H5Writer::is_data_for_current_file(const size_t data_index)
{
if (frames_per_file) {
hsize_t frame_chunk = (data_index / frames_per_file) + 1;
// This frames does not go into this file.
if (frame_chunk != current_frame_chunk) {
return false;
}
}
return true;
}
hsize_t H5Writer::prepare_storage_for_data(const string& dataset_name, const size_t data_index, const std::vector<size_t>& data_shape,
const string& data_type, const string& endianness)
{
// Check if we have to create a new file.
if (!is_data_for_current_file(data_index)) {
// Calculate to which file (1 based) the data_index belongs.
hsize_t frame_chunk = (data_index / frames_per_file) + 1;
create_file(frame_chunk);
}
// Open the file if needed.
if (!is_file_open()) {
create_file();
}
// Create the dataset if we don't have it yet.
if (datasets.find(dataset_name) == datasets.end()) {
create_dataset(dataset_name, data_shape, data_type, endianness, true, initial_dataset_size);
}
hsize_t current_dataset_size = datasets_current_size.at(dataset_name);
hsize_t relative_data_index = get_relative_data_index(data_index);
// Expand the dataset if needed.
if (relative_data_index > current_dataset_size) {
auto dataset = datasets.at(dataset_name);
hsize_t new_dataset_size = H5FormatUtils::expand_dataset(dataset, relative_data_index, dataset_increase_step);
datasets_current_size[dataset_name] = new_dataset_size;
}
// Keep track of the max index in this file - needed for shrinking the dataset at the end.
if (relative_data_index > max_data_index) {
max_data_index = relative_data_index;
}
return relative_data_index;
}
H5::H5File& H5Writer::get_h5_file()
{
return file;
}
-104
View File
@@ -1,104 +0,0 @@
#include <iostream>
#include <stdexcept>
#include "date.h"
#include "MetadataBuffer.hpp"
using namespace std;
MetadataBuffer::MetadataBuffer(
const uint64_t n_slots,
const header_map& header_values_type) :
n_slots_(n_slots),
header_values_type_(header_values_type)
{
if (!header_values_type_.empty()) {
for (const auto &header_type : header_values_type_) {
auto &name = header_type.first;
auto &header_data_type = header_type.second;
size_t bytes_size_per_frame =
header_data_type.value_shape *
header_data_type.value_bytes_size;
size_t buffer_size_bytes = n_slots_ * bytes_size_per_frame;
shared_ptr<char> buffer(
new char[buffer_size_bytes](),
std::default_delete<char[]>());
metadata_buffer.insert({name, buffer});
metadata_length_bytes.insert({name, bytes_size_per_frame});
}
}
}
void MetadataBuffer::add_metadata_to_buffer(
const string& name,
const uint64_t frame_index,
const char* data)
{
if (frame_index >= n_slots_) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "] ";
err_msg << "[MetadataBuffer::add_metadata_to_buffer]";
err_msg << " Requested frame_index " << frame_index ;
err_msg << " is out of range.";
err_msg << " Available n_slots_ " << n_slots_ << endl;
throw runtime_error(err_msg.str());
}
auto metadata = metadata_buffer.find(name);
if (metadata == metadata_buffer.end()) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "] ";
err_msg << "[MetadataBuffer::add_metadata_to_buffer]";
err_msg << " Undeclared header metadata " << name << endl;
throw runtime_error(err_msg.str());
}
size_t bytes_size_per_frame = metadata_length_bytes.at(name);
size_t buffer_offset = frame_index * bytes_size_per_frame;
char* buffer = metadata->second.get();
buffer += buffer_offset;
memcpy(buffer, data, bytes_size_per_frame);
}
shared_ptr<char> MetadataBuffer::get_metadata_values(string name)
{
auto metadata = metadata_buffer.find(name);
if (metadata == metadata_buffer.end()) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "] ";
err_msg << "[MetadataBuffer::get_metadata_values]";
err_msg << " Undeclared header metadata " << name << endl;
throw runtime_error(err_msg.str());
}
return metadata->second;
}
const MetadataBuffer::header_map& MetadataBuffer::get_header_values_type()
{
return header_values_type_;
}
uint64_t MetadataBuffer::get_n_slots()
{
return n_slots_;
}
-10
View File
@@ -1,10 +0,0 @@
add_executable(core-writer_tests test_main.cpp)
target_link_libraries(core-writer_tests
core-writer
external
gtest
zmq
hdf5
hdf5_hl
hdf5_cpp)
-75
View File
@@ -1,75 +0,0 @@
#include <memory>
#include <unordered_map>
#include <iostream>
#include "config.hpp"
#include "H5Format.hpp"
using namespace std;
using s_ptr = shared_ptr<h5_base>;
class TestH5Format : public H5Format
{
shared_ptr<unordered_map<string, DATA_TYPE>> input_value_type = NULL;
shared_ptr<unordered_map<string, boost::any>> default_values = NULL;
shared_ptr<unordered_map<string, std::string>> dataset_move_mapping = NULL;
shared_ptr<h5_parent> file_format = NULL;
public:
~TestH5Format(){};
TestH5Format(const string& dataset_name)
{
input_value_type.reset(new unordered_map<string, DATA_TYPE>());
default_values.reset(new unordered_map<string, boost::any>());
// After format has been writen, where to move the raw datasets.
dataset_move_mapping.reset(
new std::unordered_map<string, string>({
{config::raw_image_dataset_name, "detector/data"}
}));
// Definition of the file format.
file_format.reset(
new h5_parent("", EMPTY_ROOT, {
s_ptr(new h5_group("detector", {}))
}));
}
const h5_parent& get_format_definition() const override
{
return *file_format;
}
const unordered_map<string, boost::any>& get_default_values() const override
{
return *default_values;
}
void add_calculated_values(unordered_map<string, boost::any>& values) const override
{
// No calculated values.
}
void add_input_values(unordered_map<string, boost::any>& values,
const unordered_map<string, boost::any>& input_values) const override
{
// Input value mapping is 1:1.
for (const auto& input_value : input_values) {
const auto& name = input_value.first;
const auto& value = input_value.second;
values[name] = value;
}
}
const std::unordered_map<string, DATA_TYPE>& get_input_value_type() const override
{
return *input_value_type;
}
const unordered_map<string, string>& get_dataset_move_mapping() const override {
return *dataset_move_mapping;
}
};
-28
View File
@@ -1,28 +0,0 @@
#ifndef MOCK_STREAM_H
#define MOCK_STREAM_H
#include <zmq.hpp>
const std::string MOCK_STREAM_ADDRESS("tcp://127.0.0.1:11000");
void generate_stream(size_t n_messages)
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUSH);
socket.bind(MOCK_STREAM_ADDRESS);
std::string header =
"{\"frame\": 0, \"shape\": [1,16], \"type\": \"uint8\"}";
zmq::const_buffer header_msg(header.c_str(), header.length());
size_t buffer_size = 16;
char buffer[buffer_size];
zmq::const_buffer buffer_msg(buffer, buffer_size);
for (size_t i=0; i<n_messages; i++) {
socket.send(header_msg, zmq::send_flags::sndmore);
socket.send(buffer_msg);
}
}
#endif
-8
View File
@@ -1,8 +0,0 @@
#include "gtest/gtest.h"
#include "BufferedWriter.hpp"
using namespace std;
TEST(BufferedWriter, basic_interaction)
{
}
-74
View File
@@ -1,74 +0,0 @@
#include "gtest/gtest.h"
#include "H5WriteModule.hpp"
#include <string>
#include "RingBuffer.hpp"
#include "mock/TestH5Format.cpp"
using namespace std;
// Shape * 2 bytes/value (uint16_t)
size_t image_n_bytes = 1024*2*2;
void generate_frames(RingBuffer<FrameMetadata>& ring_buffer, int n_frames)
{
size_t y_length = 2;
size_t x_length = 1024;
for (int i_frame=0; i_frame < n_frames; i_frame++) {
FrameMetadata metadata = {
0, // size_t buffer_slot_index;
image_n_bytes, // size_t frame_bytes_size;
static_cast<uint64_t>(i_frame), // uint64_t frame_index;
"little", // std::string endianness;
"uint16", //std::string type;
{y_length, x_length} // std::vector<size_t> frame_shape;
};
auto ptr_metadata = make_shared<FrameMetadata>(metadata);
auto ptr_buffer = ring_buffer.reserve(ptr_metadata);
auto value_ptr = (uint16_t*) ptr_buffer;
for (size_t y=0; y<y_length; y++) {
for (size_t x=0; x<x_length; x++) {
size_t offset = y*x_length + x;
value_ptr[offset] = static_cast<uint16_t>(i_frame);
}
}
ring_buffer.commit(ptr_metadata);
}
}
TEST(H5WriteModule, basic_interaction)
{
TestH5Format format("start_dataset");
RingBuffer<FrameMetadata> ring_buffer(10);
ring_buffer.initialize(image_n_bytes);
std::unordered_map<std::string, HeaderDataType> header_map;
H5WriteModule h5_write_module(ring_buffer, header_map, format);
ASSERT_FALSE(h5_write_module.is_writing());
h5_write_module.start_writing("ignore_out.h5", 5);
ASSERT_TRUE(h5_write_module.is_writing());
generate_frames(ring_buffer, 3);
this_thread::sleep_for(chrono::milliseconds(100));
ASSERT_TRUE(h5_write_module.is_writing());
generate_frames(ring_buffer, 2);
this_thread::sleep_for(chrono::milliseconds(100));
// Writing should be completed by now.
ASSERT_FALSE(h5_write_module.is_writing());
// Stop should never throw an exception.
h5_write_module.stop_writing();
EXPECT_NO_THROW(h5_write_module.stop_writing());
EXPECT_TRUE(ring_buffer.is_empty());
}
-8
View File
@@ -1,8 +0,0 @@
#include "gtest/gtest.h"
#include "H5Writer.hpp"
using namespace std;
TEST(H5Writer, basic_interaction)
{
}
-71
View File
@@ -1,71 +0,0 @@
#include "gtest/gtest.h"
#include "ProcessManager.hpp"
#include "mock/stream.hpp"
TEST(ProcessManager, basic_interaction)
{
TestH5Format format("start_dataset");
RingBuffer<FrameMetadata> ring_buffer(10);
std::unordered_map<std::string, HeaderDataType> header_map;
ZmqRecvModule recv_module(ring_buffer, header_map);
H5WriteModule write_module(ring_buffer, header_map, format);
ProcessManager manager(write_module, recv_module);
string output_file("ignore_out.h5");
int n_frames = 5;
int user_id = -1;
size_t n_msg = 10;
EXPECT_THROW(
manager.start_writing(output_file, n_frames, user_id),
runtime_error);
thread sender(generate_stream, n_msg);
EXPECT_EQ(manager.get_status(), "idle");
manager.start_receiving(MOCK_STREAM_ADDRESS, 3);
EXPECT_EQ(manager.get_status(), "ready");
this_thread::sleep_for(chrono::milliseconds(100));
sender.join();
// Start with an empty RB.
EXPECT_TRUE(ring_buffer.is_empty());
// Write first 5 images you receive.
manager.start_writing(output_file, 5, user_id);
EXPECT_EQ(manager.get_status(), "writing");
// Send 6 images, so 1 will be left in the RB.
thread sender2(generate_stream, 6);
this_thread::sleep_for(chrono::milliseconds(100));
sender2.join();
this_thread::sleep_for(chrono::milliseconds(100));
// Writer stopped because it received all frames.
EXPECT_FALSE(write_module.is_writing());
EXPECT_EQ(manager.get_status(), "ready");
// But there should be one more frame in the RB.
EXPECT_FALSE(ring_buffer.is_empty());
// When restarting the writing, RB should be cleared first.
manager.start_writing(output_file, 5, user_id);
// Send exact number of needed frames.
thread sender3(generate_stream, 5);
this_thread::sleep_for(chrono::milliseconds(100));
sender3.join();
// Writer should complete.
EXPECT_FALSE(write_module.is_writing());
// There should be no frames left in the RB.
EXPECT_TRUE(ring_buffer.is_empty());
manager.stop_receiving();
this_thread::sleep_for(chrono::milliseconds(100));
EXPECT_EQ(manager.get_status(), "idle");
}
-171
View File
@@ -1,171 +0,0 @@
#include <thread>
#include "gtest/gtest.h"
#include "ZmqReceiver.hpp"
#include "mock/stream.hpp"
using namespace std;
namespace pt = boost::property_tree;
TEST(ZmqReceiver, get_type_byte_size)
{
EXPECT_TRUE(get_type_byte_size("uint8") == 1);
EXPECT_TRUE(get_type_byte_size("int8") == 1);
EXPECT_TRUE(get_type_byte_size("uint16") == 2);
EXPECT_TRUE(get_type_byte_size("int16") == 2);
EXPECT_TRUE(get_type_byte_size("uint32") == 4);
EXPECT_TRUE(get_type_byte_size("int32") == 4);
EXPECT_TRUE(get_type_byte_size("float32") == 4);
EXPECT_TRUE(get_type_byte_size("uint64") == 8);
EXPECT_TRUE(get_type_byte_size("int64") == 8);
EXPECT_TRUE(get_type_byte_size("float64") == 8);
}
TEST(ZmqReceiver, HeaderDataType)
{
HeaderDataType header_data_type("float64", 4);
ASSERT_TRUE(header_data_type.type == "float64");
ASSERT_TRUE(header_data_type.value_bytes_size == 8);
ASSERT_TRUE(header_data_type.value_shape == 4);
ASSERT_TRUE(header_data_type.endianness == "little");
}
TEST(ZmqReceiver, get_value_from_json)
{
pt::ptree json_header;
uint64_t frame_number = 1234567890;
json_header.add("frame_number", frame_number);
HeaderDataType header_data_type_scalar("uint64");
auto scalar_buffer = get_value_from_json(json_header, "frame_number", header_data_type_scalar);
auto scalar_value = reinterpret_cast<uint64_t*>(scalar_buffer.get());
ASSERT_TRUE(*scalar_value == frame_number);
double modules_number[] = {-345.12, 1234567.43, -2323456.32};
pt::ptree modulus_number_child;
for (int i=0; i<3; i++) {
pt::ptree value;
value.put("", modules_number[i]);
modulus_number_child.push_back(make_pair("", value));
}
json_header.add_child("modules_number", modulus_number_child);
HeaderDataType header_data_type_array("float64", 3);
auto array_buffer = get_value_from_json(json_header, "modules_number", header_data_type_array);
auto array_values = reinterpret_cast<double*>(array_buffer.get());
for (int i=0; i<3; i++) {
ASSERT_TRUE(array_values[i] == modules_number[i]);
}
}
TEST(ZmqReceiver, read_json_header)
{
int n_modules = 1;
unordered_map<string, HeaderDataType> header_values = {
{"pulse_id", HeaderDataType("uint64")},
{"frame", HeaderDataType("uint64")},
{"is_good_frame", HeaderDataType("uint64")},
{"daq_rec", HeaderDataType("int64")},
{"pulse_id_diff", HeaderDataType("int64", n_modules)},
{"framenum_diff", HeaderDataType("int64", n_modules)},
{"missing_packets_1", HeaderDataType("uint64", n_modules)},
{"missing_packets_2", HeaderDataType("uint64", n_modules)},
{"daq_recs", HeaderDataType("uint64", n_modules)},
{"pulse_ids", HeaderDataType("uint64", n_modules)},
{"framenums", HeaderDataType("uint64", n_modules)},
{"module_number", HeaderDataType("uint64", n_modules)}
};
ZmqReceiver receiver(header_values);
auto header_string = "{\"missing_packets_2\":[2],"
"\"missing_packets_1\":[1],"
"\"frame\":0,"
"\"daq_recs\":[3840],"
"\"module_number\":[0],"
"\"shape\":[512,1024],"
"\"pulse_id\":6021771850,"
"\"framenum_diff\":[-2],"
"\"pulse_ids\":[6021771850],"
"\"is_good_frame\":1,"
"\"framenums\":[193],"
"\"pulse_id_diff\":[-1],"
"\"daq_rec\":-1,"
"\"type\":\"uint16\","
"\"htype\":\"array-1.0\"}";
auto metadata = receiver.read_json_header(header_string);
ASSERT_TRUE(metadata->frame_index == 0);
ASSERT_TRUE(metadata->endianness == "little");
ASSERT_TRUE(metadata->type == "uint16");
ASSERT_TRUE(metadata->frame_shape[0] == 512);
ASSERT_TRUE(metadata->frame_shape[1] == 1024);
auto pulse_id = reinterpret_cast<uint64_t*>(metadata->header_values.at("pulse_id").get());
ASSERT_TRUE(pulse_id[0] == 6021771850);
auto frame = reinterpret_cast<uint64_t*>(metadata->header_values.at("frame").get());
ASSERT_TRUE(frame[0] == 0);
auto is_good_frame = reinterpret_cast<uint64_t*>(metadata->header_values.at("is_good_frame").get());
ASSERT_TRUE(is_good_frame[0] == 1);
auto daq_rec = reinterpret_cast<int64_t*>(metadata->header_values.at("daq_rec").get());
ASSERT_TRUE(daq_rec[0] == -1);
auto pulse_id_diff = reinterpret_cast<int64_t*>(metadata->header_values.at("pulse_id_diff").get());
ASSERT_TRUE(pulse_id_diff[0] == -1);
auto framenum_diff = reinterpret_cast<int64_t*>(metadata->header_values.at("framenum_diff").get());
ASSERT_TRUE(framenum_diff[0] == -2);
auto missing_packets_1 = reinterpret_cast<uint64_t*>(metadata->header_values.at("missing_packets_1").get());
ASSERT_TRUE(missing_packets_1[0] == 1);
auto missing_packets_2 = reinterpret_cast<uint64_t*>(metadata->header_values.at("missing_packets_2").get());
ASSERT_TRUE(missing_packets_2[0] == 2);
auto daq_recs = reinterpret_cast<uint64_t*>(metadata->header_values.at("daq_recs").get());
ASSERT_TRUE(daq_recs[0] == 3840);
auto pulse_ids = reinterpret_cast<uint64_t*>(metadata->header_values.at("pulse_ids").get());
ASSERT_TRUE(pulse_ids[0] == 6021771850);
auto framenums = reinterpret_cast<uint64_t*>(metadata->header_values.at("framenums").get());
ASSERT_TRUE(framenums[0] == 193);
auto module_number = reinterpret_cast<uint64_t*>(metadata->header_values.at("module_number").get());
ASSERT_TRUE(module_number[0] == 0);
}
TEST(ZmqReceiver, simple_recv)
{
size_t n_msg = 10;
thread sender(generate_stream, n_msg);
RingBuffer<FrameMetadata> ring_buffer(n_msg);
ZmqReceiver receiver({});
receiver.connect(MOCK_STREAM_ADDRESS, 100);
this_thread::sleep_for(chrono::milliseconds(100));
for (size_t i=0; i<n_msg; i++) {
auto data = receiver.receive();
EXPECT_TRUE(data.first != nullptr);
}
sender.join();
}
-105
View File
@@ -1,105 +0,0 @@
#include "gtest/gtest.h"
#include "ZmqRecvModule.hpp"
#include <thread>
#include <string>
#include "RingBuffer.hpp"
#include "mock/stream.hpp"
using namespace std;
TEST(ZmqRecvModule, basic_interaction)
{
RingBuffer<FrameMetadata> ring_buffer(10);
ZmqRecvModule zmq_recv_module(ring_buffer, {});
uint8_t n_receivers = 4;
zmq_recv_module.start_recv("tcp://127.0.0.1:11000", n_receivers);
EXPECT_THROW(
zmq_recv_module.start_recv("tcp://127.0.0.1:11000", n_receivers),
runtime_error);
zmq_recv_module.start_saving();
EXPECT_NO_THROW(zmq_recv_module.start_saving());
// Stop should never throw an exception.
zmq_recv_module.stop_recv();
EXPECT_NO_THROW(zmq_recv_module.stop_recv());
zmq_recv_module.stop_saving_and_clear_buffer();
EXPECT_NO_THROW(zmq_recv_module.stop_saving_and_clear_buffer());
}
TEST(ZmqRecvModule, simple_recv)
{
size_t n_msg = 10;
thread sender(generate_stream, n_msg);
RingBuffer<FrameMetadata> ring_buffer(n_msg);
ZmqRecvModule zmq_recv_module(ring_buffer, {});
zmq_recv_module.start_saving();
zmq_recv_module.start_recv(MOCK_STREAM_ADDRESS, 4);
sender.join();
this_thread::sleep_for(chrono::milliseconds(100));
zmq_recv_module.stop_recv();
for (size_t i=0;i<n_msg;i++) {
auto data = ring_buffer.read();
// nullptr means there is no data in the buffer.
ASSERT_TRUE(data.first != nullptr);
ASSERT_TRUE(data.second != nullptr);
ring_buffer.release(data.first->buffer_slot_index);
}
// no more messages in the buffer.
auto data = ring_buffer.read();
ASSERT_TRUE(data.first == nullptr);
ASSERT_TRUE(ring_buffer.is_empty());
}
TEST(ZmqRecvModule, stop_saving_and_clear_buffer)
{
size_t n_msg = 10;
thread sender(generate_stream, n_msg);
RingBuffer<FrameMetadata> ring_buffer(n_msg);
ZmqRecvModule zmq_recv_module(ring_buffer, {});
zmq_recv_module.start_saving();
zmq_recv_module.start_recv("tcp://127.0.0.1:11000", 4);
sender.join();
this_thread::sleep_for(chrono::milliseconds(100));
ASSERT_FALSE(ring_buffer.is_empty());
zmq_recv_module.stop_saving_and_clear_buffer();
ASSERT_TRUE(ring_buffer.is_empty());
thread sender2(generate_stream, 2);
sender2.join();
this_thread::sleep_for(chrono::milliseconds(100));
// No messages should be saved from this run.
ASSERT_TRUE(ring_buffer.is_empty());
zmq_recv_module.start_saving();
thread sender3(generate_stream, 2);
sender3.join();
this_thread::sleep_for(chrono::milliseconds(100));
ASSERT_FALSE(ring_buffer.is_empty());
// stop_recv does not invalidate the buffer.
zmq_recv_module.stop_recv();
ASSERT_FALSE(ring_buffer.is_empty());
zmq_recv_module.stop_saving_and_clear_buffer();
ASSERT_TRUE(ring_buffer.is_empty());
}
-28
View File
@@ -1,28 +0,0 @@
#include "gtest/gtest.h"
#include "H5Writer.hpp"
using namespace std;
TEST(H5Writer, get_h5_writer)
{
auto dummy_writer = get_h5_writer("/dev/null");
EXPECT_TRUE(dynamic_cast<DummyH5Writer*>(dummy_writer.get()));
auto real_writer = get_h5_writer("real_file.h5");
EXPECT_FALSE(dynamic_cast<DummyH5Writer*>(real_writer.get()));
}
TEST(H5Writer, DummyH5Writer)
{
DummyH5Writer dummy_writer;
EXPECT_FALSE(dummy_writer.is_file_open());
EXPECT_NO_THROW(dummy_writer.close_file());
EXPECT_THROW(dummy_writer.get_h5_file(), runtime_error);
unique_ptr<char> buffer(new char[1]);
vector<size_t> shape = {1};
EXPECT_NO_THROW(dummy_writer.write_data("does not matter", 0, buffer.get(), shape, 0, "nop", "nop"));
}
-15
View File
@@ -1,15 +0,0 @@
#include "gtest/gtest.h"
#include "test_ZmqReceiver.cpp"
#include "test_H5Writer.cpp"
#include "test_MetadataBuffer.cpp"
#include "test_BufferedWriter.cpp"
#include "test_ZmqRecvModule.cpp"
#include "test_H5WriteModule.cpp"
#include "test_ProcessManager.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+248
View File
@@ -0,0 +1,248 @@
#include <iostream>
#include <thread>
#include "jungfrau.hpp"
#include "BufferUtils.hpp"
#include "zmq.h"
#include "buffer_config.hpp"
#include <H5Cpp.h>
#include <cstring>
#include "date.h"
using namespace std;
using namespace core_buffer;
struct FileBufferMetadata {
uint64_t pulse_id[REPLAY_READ_BLOCK_SIZE];
uint64_t frame_index[REPLAY_READ_BLOCK_SIZE];
uint32_t daq_rec[REPLAY_READ_BLOCK_SIZE];
uint16_t n_received_packets[REPLAY_READ_BLOCK_SIZE];
};
void load_data_from_file (
FileBufferMetadata* metadata_buffer,
char* image_buffer,
const string &filename,
const size_t start_index)
{
hsize_t b_image_dim[3] = {REPLAY_READ_BLOCK_SIZE, 512, 1024};
H5::DataSpace b_i_space (3, b_image_dim);
hsize_t b_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024};
hsize_t b_i_start[] = {0, 0, 0};
b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start);
hsize_t f_image_dim[3] = {FILE_MOD, 512, 1024};
H5::DataSpace f_i_space (3, f_image_dim);
hsize_t f_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024};
hsize_t f_i_start[] = {start_index, 0, 0};
f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start);
hsize_t b_metadata_dim[2] = {REPLAY_READ_BLOCK_SIZE, 1};
H5::DataSpace b_m_space (2, b_metadata_dim);
hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1};
hsize_t b_m_start[] = {0, 0};
b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start);
hsize_t f_metadata_dim[2] = {FILE_MOD, 1};
H5::DataSpace f_m_space (2, f_metadata_dim);
hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1};
hsize_t f_m_start[] = {start_index, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start);
H5::H5File input_file(filename, H5F_ACC_RDONLY);
auto image_dataset = input_file.openDataSet("image");
image_dataset.read(
image_buffer, H5::PredType::NATIVE_UINT16,
b_i_space, f_i_space);
auto pulse_id_dataset = input_file.openDataSet("pulse_id");
pulse_id_dataset.read(
metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
auto frame_id_dataset = input_file.openDataSet("frame_id");
frame_id_dataset.read(
metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
auto daq_rec_dataset = input_file.openDataSet("daq_rec");
daq_rec_dataset.read(
metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32,
b_m_space, f_m_space);
auto received_packets_dataset =
input_file.openDataSet("received_packets");
received_packets_dataset.read(
metadata_buffer->n_received_packets, H5::PredType::NATIVE_UINT16,
b_m_space, f_m_space);
input_file.close();
}
void sf_live (
void* socket,
const string& device,
const string& channel_name,
const uint16_t source_id,
const uint64_t start_pulse_id)
{
auto metadata_buffer = make_unique<FileBufferMetadata>();
auto image_buffer = make_unique<uint16_t[]>(
LIVE_READ_BLOCK_SIZE * MODULE_N_PIXELS);
auto latest_filename = "";
uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD;
base_pulse_id *= core_buffer::FILE_MOD;
size_t current_pulse_id = base_pulse_id;
string filename_base = device + "/" + channel_name + "/";
for (const auto& filename_suffix:path_suffixes) {
string filename = filename_base + filename_suffix.path;
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_replay::sf_replay]";
cout << " Reading from filename " << filename << endl;
#endif
for (size_t file_index_offset=0;
file_index_offset < FILE_MOD;
file_index_offset += REPLAY_READ_BLOCK_SIZE)
{
auto start_time = chrono::steady_clock::now();
load_data_from_file(
metadata_buffer.get(),
(char*)(image_buffer.get()),
filename,
file_index_offset);
auto end_time = chrono::steady_clock::now();
auto ms_duration = chrono::duration_cast<chrono::milliseconds>(
end_time-start_time).count();
cout << "sf_replay:batch_read_ms " << ms_duration << endl;
for (size_t i_frame=0; i_frame < REPLAY_READ_BLOCK_SIZE; i_frame++) {
ModuleFrame module_frame = {
metadata_buffer->pulse_id[i_frame],
metadata_buffer->frame_index[i_frame],
metadata_buffer->daq_rec[i_frame],
metadata_buffer->n_received_packets[i_frame],
source_id
};
if (current_pulse_id < start_pulse_id) {
current_pulse_id++;
continue;
}
if (current_pulse_id != module_frame.pulse_id) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[sf_live::sf_live]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << current_pulse_id;
err_msg << " received " << module_frame.pulse_id;
err_msg << endl;
throw runtime_error(err_msg.str());
}
zmq_send(socket,
&module_frame,
sizeof(ModuleFrame),
ZMQ_SNDMORE);
auto buff_offset = i_frame * MODULE_N_PIXELS;
zmq_send(socket,
(char*)(image_buffer.get() + buff_offset),
MODULE_N_BYTES,
0);
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_replay::sf_replay]";
cout << " Sent pulse_id ";
cout << current_pulse_id << endl;
#endif
current_pulse_id++;
}
}
}
}
int main (int argc, char *argv[]) {
if (argc != 6) {
cout << endl;
cout << "Usage: sf_live [device]";
cout << " [channel_name] [source_id] [start_pulse_id]";
cout << endl;
cout << "\tdevice: Name of detector." << endl;
cout << "\tchannel_name: M00-M31 for JF16M." << endl;
cout << "\tsource_id: Module index" << endl;
cout << endl;
exit(-1);
}
const string device = string(argv[1]);
const string channel_name = string(argv[2]);
const uint16_t source_id = (uint16_t) atoi(argv[3]);
const uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
stringstream ipc_stream;
ipc_stream << "ipc://sf-live-" << (int)source_id;
const auto ipc_address = ipc_stream.str();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_live::main]";
cout << " device " << device;
cout << " channel_name " << channel_name;
cout << " source_id " << source_id;
cout << " start_pulse_id " << start_pulse_id;
cout << " ipc_address " << ipc_address;
cout << endl;
#endif
auto ctx = zmq_ctx_new();
auto socket = zmq_socket(ctx, ZMQ_PUSH);
const int sndhwm = REPLAY_READ_BLOCK_SIZE;
if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(strerror (errno));
const int linger_ms = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
throw runtime_error(strerror (errno));
if (zmq_connect(socket, ipc_address.c_str()) != 0)
throw runtime_error(strerror (errno));
sf_live(socket, device, channel_name, source_id, start_pulse_id);
zmq_close(socket);
zmq_ctx_destroy(ctx);
}