Merge branch 'generalized_serializer' into 'main'

Generalized serializer

See merge request jungfraujoch/nextgendcu!13
This commit is contained in:
2023-12-11 06:49:24 +01:00
52 changed files with 579 additions and 557 deletions

View File

@@ -32,6 +32,7 @@ Required:
* HDF5 library version 1.10 or newer
* ZeroMQ library
* Google Remote Procedure Call (gRPC) - see notes below
* OpenSSL library
Optional:
* CUDA compiler version 11 or newer - image analysis features won't work without it

View File

@@ -7,7 +7,7 @@
#include "../common/DiffractionExperiment.h"
#include "JFJochBroker.h"
#include "../acquisition_device/AcquisitionDeviceGroup.h"
#include "../common/ZMQPreviewPublisher.h"
#include "../frame_serialize/ZMQPreviewPublisher.h"
DetectorGeometry ParseStandardDetectorGeometry(const nlohmann::json &j);
DetectorGeometry ParseCustomDetectorGeometry(const nlohmann::json &j);

View File

@@ -10,7 +10,7 @@
#include "../grpc/gRPCServer_Template.h"
#include "JFJochBrokerParser.h"
#include "../common/ZMQImagePusher.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
int main (int argc, char **argv) {
if (argc > 3) {
@@ -35,7 +35,7 @@ int main (int argc, char **argv) {
}
std::unique_ptr<JFJochReceiverService> receiver;
std::unique_ptr<ZMQImagePusher> image_pusher;
std::unique_ptr<ZMQStream2Pusher> image_pusher;
std::unique_ptr<ZMQPreviewPublisher> preview_publisher;
ZMQContext context;
@@ -47,7 +47,7 @@ int main (int argc, char **argv) {
ParseAcquisitionDeviceGroup(input, "devices", aq_devices);
if (aq_devices.size() > 0) {
experiment.DataStreams(aq_devices.size());
image_pusher = std::make_unique<ZMQImagePusher>(context, ParseStringArray(input, "zmq_image_addr"));
image_pusher = std::make_unique<ZMQStream2Pusher>(context, ParseStringArray(input, "zmq_image_addr"));
receiver = std::make_unique<JFJochReceiverService>(aq_devices, logger, *image_pusher);
std::string zmq_preview_addr = ParseString(input, "zmq_preview_addr");

View File

@@ -30,31 +30,24 @@ ADD_LIBRARY( CommonFunctions STATIC
Definitions.h
${CMAKE_CURRENT_BINARY_DIR}/GitInfo.cpp GitInfo.h
FrameTransformation.cpp FrameTransformation.h
ZMQWrappers.cpp ZMQWrappers.h
ThreadSafeFIFO.h
ZMQPreviewPublisher.cpp ZMQPreviewPublisher.h
ZMQImagePusher.cpp ZMQImagePusher.h
DiffractionSpot.cpp DiffractionSpot.h
StatusVector.h
ImagePusher.cpp ImagePusher.h
TestImagePusher.cpp TestImagePusher.h
SpotToSave.h
NetworkAddressConvert.h NetworkAddressConvert.cpp
to_fixed.h
DetectorGeometry.cpp DetectorGeometry.h
DetectorModuleGeometry.cpp DetectorModuleGeometry.h
DetectorSetup.h DetectorSetup.cpp ZeroCopyReturnValue.h Histogram.h DiffractionGeometry.h
CUDAWrapper.cpp
CUDAWrapper.h
NUMAHWPolicy.cpp
NUMAHWPolicy.h
ADUHistogram.cpp
ADUHistogram.h
CUDAWrapper.cpp CUDAWrapper.h
NUMAHWPolicy.cpp NUMAHWPolicy.h
ADUHistogram.cpp ADUHistogram.h
RawToConvertedGeometryCore.h
Plot.h
DeviceOutput.h)
DeviceOutput.h
ZMQWrappers.cpp ZMQWrappers.h)
TARGET_LINK_LIBRARIES(CommonFunctions Compression FrameSerialize libzmq JFCalibration -lrt)
TARGET_LINK_LIBRARIES(CommonFunctions Compression JFCalibration libzmq -lrt)
IF (CMAKE_CUDA_COMPILER)
TARGET_SOURCES(CommonFunctions PRIVATE CUDAWrapper.cu )

View File

@@ -12,7 +12,7 @@
#include "UnitCell.h"
#include "Coord.h"
#include "Definitions.h"
#include "../frame_serialize/CBORMessages.h"
#include "../frame_serialize/JFJochMessages.h"
#include "DetectorSetup.h"
#include "../image_analysis/DataProcessingSettings.h"

View File

@@ -34,9 +34,28 @@ FrameTransformation::FrameTransformation(const DiffractionExperiment &in_experim
}
}
size_t FrameTransformation::SaveCompressedImage(void *output) {
return compressor.Compress((char *) output, precompression_buffer.data(),
experiment.GetPixelsNum(), pixel_depth);
CompressedImage FrameTransformation::GetCompressedImage() {
CompressedImage image{};
if (experiment.GetCompressionAlgorithm() == CompressionAlgorithm::NO_COMPRESSION) {
image.data = (uint8_t *) precompression_buffer.data();
image.size = experiment.GetPixelsNum() * experiment.GetPixelDepth();
} else {
compressed_buffer.resize(MaxCompressedSize(experiment.GetCompressionAlgorithm(),
experiment.GetPixelsNum(),
experiment.GetPixelDepth()));
image.data = (uint8_t *) compressed_buffer.data();
image.size = compressor.Compress(compressed_buffer.data(), precompression_buffer.data(),
experiment.GetPixelsNum(), experiment.GetPixelDepth());
}
image.xpixel = experiment.GetXPixelsNum();
image.ypixel = experiment.GetYPixelsNum();
image.algorithm = experiment.GetCompressionAlgorithm();
image.pixel_depth_bytes = experiment.GetPixelDepth();
image.pixel_is_signed = pixel_signed;
image.pixel_is_float = false;
image.channel = "default";
return image;
}
void FrameTransformation::ProcessModule(const void *input, uint16_t module_number, int data_stream) {
@@ -80,6 +99,6 @@ void FrameTransformation::ProcessModule(const void *input, uint16_t module_numbe
}
}
const void *FrameTransformation::GetPreviewImage() const {
const void *FrameTransformation::GetImage() const {
return precompression_buffer.data();
}

View File

@@ -5,19 +5,21 @@
#include "DiffractionExperiment.h"
#include "../compression/JFJochCompressor.h"
#include "../frame_serialize/JFJochMessages.h"
class FrameTransformation {
const DiffractionExperiment& experiment;
JFJochBitShuffleCompressor compressor;
std::vector<char> precompression_buffer;
std::vector<char> compressed_buffer;
const bool pixel_signed;
const size_t pixel_depth;
public:
explicit FrameTransformation(const DiffractionExperiment &experiment);
void ProcessModule(const void *input, uint16_t module_number, int data_stream);
size_t SaveCompressedImage(void *output);
const void *GetPreviewImage() const;
CompressedImage GetCompressedImage();
const void *GetImage() const;
};
#endif //JUNGFRAUJOCH_FRAMETRANSFORMATION_H

View File

@@ -1,29 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_IMAGEPUSHER_H
#define JUNGFRAUJOCH_IMAGEPUSHER_H
#include <cstdint>
#include <vector>
#include "DiffractionExperiment.h"
#include "DiffractionSpot.h"
#include "../frame_serialize/JFJochFrameSerializer.h"
#include "../frame_serialize/CBORMessages.h"
#include "ZeroCopyReturnValue.h"
void PrepareCBORImage(DataMessage& message,
const DiffractionExperiment &experiment,
void *image, size_t image_size);
class ImagePusher {
public:
virtual void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) = 0;
virtual void EndDataCollection(const EndMessage& message) = 0;
virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0;
virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) = 0;
};
#endif //JUNGFRAUJOCH_IMAGEPUSHER_H

View File

@@ -1,30 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_ZMQIMAGEPUSHER_H
#define JUNGFRAUJOCH_ZMQIMAGEPUSHER_H
#include "ImagePusher.h"
#include "ThreadSafeFIFO.h"
#include "ZMQWrappers.h"
#include "DiffractionSpot.h"
#include "../frame_serialize/JFJochFrameSerializer.h"
class ZMQImagePusher : public ImagePusher {
std::vector<std::unique_ptr<ZMQContext>> contexts;
std::vector<std::unique_ptr<ZMQSocket>> sockets;
int64_t file_count = 1;
public:
ZMQImagePusher(ZMQContext &context, const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
// High performance implementation, where each socket has dedicated ZMQ context
explicit ZMQImagePusher(const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) override;
void EndDataCollection(const EndMessage& message) override;
};
#endif //JUNGFRAUJOCH_ZMQIMAGEPUSHER_H

View File

@@ -1,30 +0,0 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "ZMQPreviewPublisher.h"
ZMQPreviewPublisher::ZMQPreviewPublisher(ZMQContext& context, const std::string& addr) :
socket(context, ZMQSocketType::Pub) {
socket.SendWaterMark(2).NoLinger();
socket.Bind(addr);
}
void ZMQPreviewPublisher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t preview_stride) {
{
std::unique_lock<std::mutex> ul(m);
stride = preview_stride;
current_part = -1;
}
socket.Send(image_data, image_size);
}
void ZMQPreviewPublisher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
{
std::unique_lock<std::mutex> ul(m);
int64_t part = image_number / stride;
if (current_part >= part)
return;
else
current_part = part;
}
socket.Send(image_data, image_size);
}

View File

@@ -1,5 +1,9 @@
// Copyright (2019-2023) Paul Scherrer Institute
// Copyright (2019-2023) Paul Scherrer Institute
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_ZMQWRAPPERS_H
#define JUNGFRAUJOCH_ZMQWRAPPERS_H

View File

@@ -56,7 +56,17 @@ size_t JFJochBitShuffleCompressor::CompressBlock(char *dest, const char *source,
return compressed_size + 4;
}
size_t JFJochBitShuffleCompressor::Compress(char *dest, const char *source, size_t nelements, size_t elem_size) {
std::vector<uint8_t> JFJochBitShuffleCompressor::Compress(const void *source, size_t nelements, size_t elem_size) {
std::vector<uint8_t> tmp(MaxCompressedSize(algorithm, nelements, elem_size));
size_t tmp_size = Compress(tmp.data(), source, nelements, elem_size);
tmp.resize(tmp_size);
return tmp;
}
size_t JFJochBitShuffleCompressor::Compress(void *dest, const void *source, size_t nelements, size_t elem_size) {
auto c_dest = (char *) dest;
auto c_source = (char *) source;
static_assert(DefaultBlockSize % BSHUF_BLOCKED_MULT == 0, "Block size must be multiple of 8");
if (algorithm == CompressionAlgorithm::NO_COMPRESSION) {
@@ -65,8 +75,8 @@ size_t JFJochBitShuffleCompressor::Compress(char *dest, const char *source, size
return nelements * elem_size;
}
bshuf_write_uint64_BE(dest, nelements * elem_size);
bshuf_write_uint32_BE(dest + 8, DefaultBlockSize * elem_size);
bshuf_write_uint64_BE(c_dest, nelements * elem_size);
bshuf_write_uint32_BE(c_dest + 8, DefaultBlockSize * elem_size);
if (tmp_space.size() < DefaultBlockSize * elem_size)
tmp_space.resize(DefaultBlockSize * elem_size);
@@ -75,18 +85,19 @@ size_t JFJochBitShuffleCompressor::Compress(char *dest, const char *source, size
size_t reminder_size = nelements - num_full_blocks * DefaultBlockSize;
size_t compressed_size = 12;
for (int i = 0; i < num_full_blocks; i++)
compressed_size += CompressBlock(dest + compressed_size,
source + i * DefaultBlockSize * elem_size, DefaultBlockSize, elem_size);
compressed_size += CompressBlock(c_dest + compressed_size,
c_source + i * DefaultBlockSize * elem_size, DefaultBlockSize, elem_size);
size_t last_block_size = reminder_size - reminder_size % BSHUF_BLOCKED_MULT;
if (last_block_size > 0)
compressed_size += CompressBlock(dest + compressed_size,
source + num_full_blocks * DefaultBlockSize * elem_size, last_block_size, elem_size);
compressed_size += CompressBlock(c_dest + compressed_size,
c_source + num_full_blocks * DefaultBlockSize * elem_size, last_block_size, elem_size);
size_t leftover_bytes = (reminder_size % BSHUF_BLOCKED_MULT) * elem_size;
if (leftover_bytes > 0) {
memcpy(dest + compressed_size, source + (num_full_blocks * DefaultBlockSize + last_block_size) * elem_size, leftover_bytes);
memcpy(c_dest + compressed_size, c_source + (num_full_blocks * DefaultBlockSize + last_block_size) * elem_size, leftover_bytes);
compressed_size += leftover_bytes;
}
return compressed_size;

View File

@@ -31,13 +31,10 @@ public:
template<class T>
std::vector<uint8_t> Compress(const std::vector<T> &src) {
std::vector<uint8_t> tmp(MaxCompressedSize(algorithm, src.size(), sizeof(T)));
size_t tmp_size = Compress(tmp.data(), src);
tmp.resize(tmp_size);
return tmp;
return Compress(src.data(), src.size(), sizeof(T));
}
size_t Compress(char *dest, const char* source, size_t nelements, size_t elem_size);
std::vector<uint8_t> Compress(const void* source, size_t nelements, size_t elem_size);
size_t Compress(void *dest, const void* source, size_t nelements, size_t elem_size);
private:
char scratch[DefaultBlockSize * sizeof(uint64_t)];
};

View File

@@ -20,10 +20,15 @@ extern "C" {
template <class Td, class Ts>
void JFJochDecompress(std::vector<Td> &output, CompressionAlgorithm algorithm, std::vector<Ts> source_v,
size_t nelements) {
JFJochDecompress<Td, Ts>(output, algorithm, source_v.data(), source_v.size() * sizeof(Ts), nelements);
}
template <class Td, class Ts>
void JFJochDecompress(std::vector<Td> &output, CompressionAlgorithm algorithm, Ts *source_v, size_t source_size,
size_t nelements) {
size_t elem_size = sizeof(Td);
output.resize(nelements * elem_size);
size_t source_size = source_v.size() * sizeof(Ts);
auto source = (uint8_t *) source_v.data();
auto source = (uint8_t *) source_v;
size_t block_size;
if (algorithm != CompressionAlgorithm::NO_COMPRESSION) {
@@ -55,5 +60,4 @@ void JFJochDecompress(std::vector<Td> &output, CompressionAlgorithm algorithm, s
}
}
#endif //JUNGFRAUJOCH_JFJOCHDECOMPRESS_H

View File

@@ -1,6 +1,6 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "JFJochFrameDeserializer.h"
#include "CBORStream2Deserializer.h"
#include "tinycbor/src/cbor.h"
#include "CborErr.h"
#include "CborUtil.h"
@@ -185,7 +185,7 @@ inline void GetCBORUInt64Array(CborValue &value, std::vector<uint64_t> &v) {
memcpy(v.data(), ptr, len);
}
inline void GetCBORTypedArray(CBORImage &v, CborValue &value) {
inline void GetCBORTypedArray(CompressedImage &v, CborValue &value) {
CborTag tag = GetCBORTag(value);
switch (tag) {
@@ -264,7 +264,7 @@ inline void GetCBORTypedArray(CBORImage &v, CborValue &value) {
throw JFJochException(JFJochExceptionCategory::CBORError, "Byte string or compressed array expected");
}
void GetCBORMultidimTypedArray(CBORImage &v, CborValue &value) {
void GetCBORMultidimTypedArray(CompressedImage &v, CborValue &value) {
if (GetCBORTag(value) != TagMultiDimArray)
throw JFJochException(JFJochExceptionCategory::CBORError, "Multidim array expected");
@@ -294,7 +294,7 @@ bool CheckMagicNumber(CborValue &v) {
}
}
void JFJochFrameDeserializer::GetCBORSpots(CborValue &value) {
void CBORStream2Deserializer::GetCBORSpots(CborValue &value) {
size_t array_len = GetCBORArrayLen(value);
if (array_len % 4 != 0)
@@ -315,7 +315,7 @@ void JFJochFrameDeserializer::GetCBORSpots(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &array_value));
}
void JFJochFrameDeserializer::DecodeType(CborValue &value) {
void CBORStream2Deserializer::DecodeType(CborValue &value) {
if (cbor_value_at_end(&value))
throw JFJochException(JFJochExceptionCategory::CBORError, "Message empty");
auto key = GetCBORString(value);
@@ -337,7 +337,7 @@ void JFJochFrameDeserializer::DecodeType(CborValue &value) {
throw JFJochException(JFJochExceptionCategory::CBORError, "Unknown message type");
}
void JFJochFrameDeserializer::ProcessGoniometerMap(CborValue &value) {
void CBORStream2Deserializer::ProcessGoniometerMap(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
@@ -348,7 +348,7 @@ void JFJochFrameDeserializer::ProcessGoniometerMap(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &map_value));
}
GoniometerAxis JFJochFrameDeserializer::ProcessGoniometer(CborValue &value) {
GoniometerAxis CBORStream2Deserializer::ProcessGoniometer(CborValue &value) {
CborValue map_value;
GoniometerAxis ret{};
@@ -366,7 +366,7 @@ GoniometerAxis JFJochFrameDeserializer::ProcessGoniometer(CborValue &value) {
return ret;
}
void JFJochFrameDeserializer::ProcessDetTranslation(CborValue &value) {
void CBORStream2Deserializer::ProcessDetTranslation(CborValue &value) {
if (GetCBORArrayLen(value) != 3)
throw JFJochException(JFJochExceptionCategory::CBORError, "Array with 3 floats expected");
CborValue array_value;
@@ -377,7 +377,7 @@ void JFJochFrameDeserializer::ProcessDetTranslation(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &array_value));
}
void JFJochFrameDeserializer::ProcessImageMessageUserDataElement(CborValue &value) {
void CBORStream2Deserializer::ProcessImageMessageUserDataElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
@@ -413,7 +413,7 @@ void JFJochFrameDeserializer::ProcessImageMessageUserDataElement(CborValue &valu
cborErr(cbor_value_leave_container(&value, &map_value));
}
bool JFJochFrameDeserializer::ProcessImageMessageElement(CborValue &value) {
bool CBORStream2Deserializer::ProcessImageMessageElement(CborValue &value) {
if (cbor_value_at_end(&value))
return false;
else {
@@ -443,12 +443,12 @@ bool JFJochFrameDeserializer::ProcessImageMessageElement(CborValue &value) {
}
}
void JFJochFrameDeserializer::ProcessCalibration(CborValue &value) {
void CBORStream2Deserializer::ProcessCalibration(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
while (! cbor_value_at_end(&map_value)) {
auto key = GetCBORString(map_value);
CBORImage image;
CompressedImage image;
image.channel = key;
GetCBORMultidimTypedArray(image, map_value);
start_message.calibration.push_back(image);
@@ -457,12 +457,12 @@ void JFJochFrameDeserializer::ProcessCalibration(CborValue &value) {
}
void JFJochFrameDeserializer::ProcessPixelMaskElement(CborValue &value) {
void CBORStream2Deserializer::ProcessPixelMaskElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
while (! cbor_value_at_end(&map_value)) {
auto key = GetCBORString(map_value);
CBORImage image;
CompressedImage image;
image.channel = key;
GetCBORMultidimTypedArray(image, map_value);
start_message.pixel_mask.push_back(image);
@@ -470,7 +470,7 @@ void JFJochFrameDeserializer::ProcessPixelMaskElement(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &map_value));
}
void JFJochFrameDeserializer::ProcessRadIntResultElement(CborValue &value) {
void CBORStream2Deserializer::ProcessRadIntResultElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
while (! cbor_value_at_end(&map_value)) {
@@ -482,7 +482,7 @@ void JFJochFrameDeserializer::ProcessRadIntResultElement(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &map_value));
}
void JFJochFrameDeserializer::ProcessADUHistogramElement(CborValue &value) {
void CBORStream2Deserializer::ProcessADUHistogramElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
while (! cbor_value_at_end(&map_value)) {
@@ -494,7 +494,7 @@ void JFJochFrameDeserializer::ProcessADUHistogramElement(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &map_value));
}
void JFJochFrameDeserializer::ProcessChannels(CborValue &value) {
void CBORStream2Deserializer::ProcessChannels(CborValue &value) {
if (!cbor_value_is_array(&value))
throw JFJochException(JFJochExceptionCategory::CBORError, "Array expected");
CborValue array_value;
@@ -506,7 +506,7 @@ void JFJochFrameDeserializer::ProcessChannels(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &array_value));
}
void JFJochFrameDeserializer::ProcessUnitCellElement(CborValue &value) {
void CBORStream2Deserializer::ProcessUnitCellElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
while (! cbor_value_at_end(&map_value)) {
@@ -530,7 +530,7 @@ void JFJochFrameDeserializer::ProcessUnitCellElement(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &map_value));
}
void JFJochFrameDeserializer::ProcessStartMessageUserDataElement(CborValue &value) {
void CBORStream2Deserializer::ProcessStartMessageUserDataElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
@@ -600,7 +600,7 @@ void JFJochFrameDeserializer::ProcessStartMessageUserDataElement(CborValue &valu
cborErr(cbor_value_leave_container(&value, &map_value));
}
bool JFJochFrameDeserializer::ProcessStartMessageElement(CborValue &value) {
bool CBORStream2Deserializer::ProcessStartMessageElement(CborValue &value) {
if (cbor_value_at_end(&value))
return false;
else {
@@ -664,7 +664,7 @@ bool JFJochFrameDeserializer::ProcessStartMessageElement(CborValue &value) {
}
}
void JFJochFrameDeserializer::ProcessEndMessageUserDataElement(CborValue &value) {
void CBORStream2Deserializer::ProcessEndMessageUserDataElement(CborValue &value) {
CborValue map_value;
cborErr(cbor_value_enter_container(&value, &map_value));
@@ -695,7 +695,7 @@ void JFJochFrameDeserializer::ProcessEndMessageUserDataElement(CborValue &value)
cborErr(cbor_value_leave_container(&value, &map_value));
}
bool JFJochFrameDeserializer::ProcessEndMessageElement(CborValue &value) {
bool CBORStream2Deserializer::ProcessEndMessageElement(CborValue &value) {
if (cbor_value_at_end(&value))
return false;
else {
@@ -714,7 +714,7 @@ bool JFJochFrameDeserializer::ProcessEndMessageElement(CborValue &value) {
}
}
void JFJochFrameDeserializer::ProcessImageData(CborValue &value) {
void CBORStream2Deserializer::ProcessImageData(CborValue &value) {
if (!cbor_value_is_map(&value))
throw JFJochException(JFJochExceptionCategory::CBORError, "Map expected");
@@ -732,11 +732,11 @@ void JFJochFrameDeserializer::ProcessImageData(CborValue &value) {
cborErr(cbor_value_leave_container(&value, &map_value));
}
void JFJochFrameDeserializer::Process(const std::vector<uint8_t> &buffer) {
void CBORStream2Deserializer::Process(const std::vector<uint8_t> &buffer) {
Process(buffer.data(), buffer.size());
}
void JFJochFrameDeserializer::Process(const uint8_t *msg, size_t msg_size) {
void CBORStream2Deserializer::Process(const uint8_t *msg, size_t msg_size) {
std::unique_lock<std::mutex> ul(m);
data_message = DataMessage();
@@ -777,22 +777,22 @@ std::unique_lock<std::mutex> ul(m);
"Serialized frame must be map in top level");
}
DataMessage JFJochFrameDeserializer::GetDataMessage() const {
DataMessage CBORStream2Deserializer::GetDataMessage() const {
std::unique_lock<std::mutex> ul(m);
return data_message;
}
JFJochFrameDeserializer::Type JFJochFrameDeserializer::GetType() const {
CBORStream2Deserializer::Type CBORStream2Deserializer::GetType() const {
std::unique_lock<std::mutex> ul(m);
return msg_type;
}
EndMessage JFJochFrameDeserializer::GetEndMessage() const {
EndMessage CBORStream2Deserializer::GetEndMessage() const {
std::unique_lock<std::mutex> ul(m);
return end_message;
}
StartMessage JFJochFrameDeserializer::GetStartMessage() const {
StartMessage CBORStream2Deserializer::GetStartMessage() const {
std::unique_lock<std::mutex> ul(m);
return start_message;
}

View File

@@ -1,7 +1,7 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_JFJOCHFRAMEDESERIALIZER_H
#define JUNGFRAUJOCH_JFJOCHFRAMEDESERIALIZER_H
#ifndef JUNGFRAUJOCH_CBORSTREAM2DESERIALIZER_H
#define JUNGFRAUJOCH_CBORSTREAM2DESERIALIZER_H
#include <cstdint>
#include <vector>
@@ -10,10 +10,10 @@
#include "../common/SpotToSave.h"
#include "tinycbor/src/cbor.h"
#include "CBORMessages.h"
#include "JFJochMessages.h"
#include <mutex>
class JFJochFrameDeserializer {
class CBORStream2Deserializer {
public:
enum class Type {START, END, IMAGE};
private:
@@ -58,4 +58,4 @@ public:
};
#endif //JUNGFRAUJOCH_JFJOCHFRAMEDESERIALIZER_H
#endif //JUNGFRAUJOCH_CBORSTREAM2DESERIALIZER_H

View File

@@ -1,6 +1,6 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "JFJochFrameSerializer.h"
#include "CBORStream2Serializer.h"
#include "tinycbor/src/cbor.h"
#include "CborErr.h"
#include "CborUtil.h"
@@ -77,7 +77,7 @@ void CBOR_ENC_COMPRESSED(CborEncoder &encoder,
}
}
inline void CBOR_ENC_2D_TYPED_ARRAY(CborEncoder &encoder, const CBORImage& image) {
inline void CBOR_ENC_2D_TYPED_ARRAY(CborEncoder &encoder, const CompressedImage& image) {
//if ((algorithm == CompressionAlgorithm::NO_COMPRESSION) && (xpixel * ypixel != image_size / elem_size))
// throw JFJochException(JFJochExceptionCategory::CBORError, "Mismatch in array size");
@@ -182,7 +182,7 @@ inline void CBOR_ENC(CborEncoder &encoder, const char* key, const std::vector<Sp
cborErr(cbor_encoder_close_container(&encoder, &arrayEncoder));
}
inline void CBOR_ENC(CborEncoder &encoder, const char* key, const CBORImage& message) {
inline void CBOR_ENC(CborEncoder &encoder, const char* key, const CompressedImage& message) {
CborEncoder mapEncoder;
cborErr(cbor_encode_text_stringz(&encoder, key));
@@ -250,7 +250,7 @@ inline void CBOR_ENC_UNIT_CELL(CborEncoder &encoder, const char* key, const floa
cborErr(cbor_encoder_close_container(&encoder, &mapEncoder));
}
inline void CBOR_ENC(CborEncoder &encoder, const char* key, const std::vector<CBORImage> &v) {
inline void CBOR_ENC(CborEncoder &encoder, const char* key, const std::vector<CompressedImage> &v) {
CborEncoder mapEncoder;
cborErr(cbor_encode_text_stringz(&encoder, key));
@@ -306,40 +306,14 @@ inline void CBOR_ENC_USER_DATA(CborEncoder &encoder, const StartMessage& message
cborErr(cbor_encoder_close_container(&encoder, &mapEncoder));
}
JFJochFrameSerializer::JFJochFrameSerializer(uint8_t *in_buffer, size_t buffer_size) :
CBORStream2Serializer::CBORStream2Serializer(uint8_t *in_buffer, size_t buffer_size) :
buffer(in_buffer), max_buffer_size(buffer_size), curr_size(0) {}
size_t JFJochFrameSerializer::GetImageAppendOffset() {
return curr_size + sizeof(size_t);
}
size_t JFJochFrameSerializer::GetBufferSize() const {
size_t CBORStream2Serializer::GetBufferSize() const {
return curr_size;
}
size_t JFJochFrameSerializer::GetRemainingBuffer() const {
return max_buffer_size - curr_size;
}
void JFJochFrameSerializer::AppendImage(size_t image_size) {
if (curr_size + image_size + sizeof(size_t) >= max_buffer_size)
throw JFJochException(JFJochExceptionCategory::CBORError, "No space to extend the image");
curr_size--;
buffer[curr_size] = 0x40 | 27;
curr_size++;
#ifdef LITTLE_ENDIAN
size_t image_size_be = __builtin_bswap64(image_size);
#else
size_t image_size_be = image_size;
#endif
memcpy(buffer + curr_size, &image_size_be, sizeof(size_t));
curr_size += sizeof(size_t);
curr_size += image_size;
}
void JFJochFrameSerializer::SerializeSequenceStart(const StartMessage& message) {
void CBORStream2Serializer::SerializeSequenceStart(const StartMessage& message) {
CborEncoder encoder, mapEncoder;
cbor_encoder_init(&encoder, buffer, max_buffer_size, 0);
@@ -387,7 +361,7 @@ void JFJochFrameSerializer::SerializeSequenceStart(const StartMessage& message)
curr_size = cbor_encoder_get_buffer_size(&encoder, buffer);
}
void JFJochFrameSerializer::SerializeSequenceEnd(const EndMessage& message) {
void CBORStream2Serializer::SerializeSequenceEnd(const EndMessage& message) {
CborEncoder encoder, mapEncoder, userDataMapEncoder;
cbor_encoder_init(&encoder, buffer, max_buffer_size, 0);
cborErr(cbor_encode_tag(&encoder, CborSignatureTag));
@@ -415,7 +389,7 @@ void JFJochFrameSerializer::SerializeSequenceEnd(const EndMessage& message) {
curr_size = cbor_encoder_get_buffer_size(&encoder, buffer);
}
void JFJochFrameSerializer::SerializeImage(const DataMessage& message) {
void CBORStream2Serializer::SerializeImage(const DataMessage& message) {
CborEncoder encoder, mapEncoder, userDataMapEncoder;
cbor_encoder_init(&encoder, buffer, max_buffer_size, 0);

View File

@@ -1,28 +1,25 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_JFJOCHFRAMESERIALIZER_H
#define JUNGFRAUJOCH_JFJOCHFRAMESERIALIZER_H
#ifndef JUNGFRAUJOCH_CBORSTREAM2SERIALIZER_H
#define JUNGFRAUJOCH_CBORSTREAM2SERIALIZER_H
#include <vector>
#include <cstdint>
#include <cstddef>
#include "../common/SpotToSave.h"
#include "CBORMessages.h"
#include "JFJochMessages.h"
class JFJochFrameSerializer {
class CBORStream2Serializer {
uint8_t *buffer = nullptr;
size_t max_buffer_size;
size_t curr_size;
public:
explicit JFJochFrameSerializer(uint8_t *buffer, size_t buffer_size);
[[nodiscard]] size_t GetImageAppendOffset();
explicit CBORStream2Serializer(uint8_t *buffer, size_t buffer_size);
[[nodiscard]] size_t GetBufferSize() const;
[[nodiscard]] size_t GetRemainingBuffer() const;
void SerializeSequenceStart(const StartMessage& message);
void SerializeSequenceEnd(const EndMessage& message);
void SerializeImage(const DataMessage& message);
void AppendImage(size_t image_size);
};
#endif //JUNGFRAUJOCH_JFJOCHFRAMESERIALIZER_H
#endif //JUNGFRAUJOCH_CBORSTREAM2SERIALIZER_H

View File

@@ -1,6 +1,6 @@
ADD_LIBRARY(FrameSerialize STATIC
JFJochFrameSerializer.cpp JFJochFrameSerializer.h
JFJochFrameDeserializer.cpp JFJochFrameDeserializer.h
ADD_LIBRARY(CBORStream2FrameSerialize STATIC
CBORStream2Serializer.cpp CBORStream2Serializer.h
CBORStream2Deserializer.cpp CBORStream2Deserializer.h
tinycbor/src/cborparser_dup_string.c
tinycbor/src/cborencoder.c
tinycbor/src/cborencoder_close_container_checked.c
@@ -10,4 +10,16 @@ ADD_LIBRARY(FrameSerialize STATIC
tinycbor/src/cborpretty.c
tinycbor/src/cborerrorstrings.c
tinycbor/src/cbor.h
tinycbor/src/tinycbor-version.h CborErr.h CborUtil.h CBORMessages.h)
tinycbor/src/tinycbor-version.h CborErr.h CborUtil.h JFJochMessages.h)
ADD_LIBRARY(ImagePusher STATIC
ImagePusher.cpp ImagePusher.h
TestImagePusher.cpp TestImagePusher.h
ZMQStream2Pusher.cpp ZMQStream2Pusher.h
ZMQPreviewPublisher.cpp ZMQPreviewPublisher.h
ZMQBsreadImagePusher.cpp
ZMQBsreadImagePusher.h)
FIND_PACKAGE(OpenSSL REQUIRED)
TARGET_LINK_LIBRARIES(ImagePusher CBORStream2FrameSerialize CommonFunctions Compression OpenSSL::Crypto)

View File

@@ -0,0 +1,27 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_IMAGEPUSHER_H
#define JUNGFRAUJOCH_IMAGEPUSHER_H
#include <cstdint>
#include <vector>
#include "../common/DiffractionExperiment.h"
#include "../common/DiffractionSpot.h"
#include "CBORStream2Serializer.h"
#include "JFJochMessages.h"
#include "../common/ZeroCopyReturnValue.h"
void PrepareCBORImage(DataMessage& message,
const DiffractionExperiment &experiment,
void *image, size_t image_size);
class ImagePusher {
public:
virtual void StartDataCollection(const StartMessage& message) = 0;
virtual void EndDataCollection(const EndMessage& message) = 0;
virtual void SendImage(const DataMessage& message) = 0;
};
#endif //JUNGFRAUJOCH_IMAGEPUSHER_H

View File

@@ -1,7 +1,7 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_CBORMESSAGES_H
#define JUNGFRAUJOCH_CBORMESSAGES_H
#ifndef JUNGFRAUJOCH_JFJOCHMESSAGES_H
#define JUNGFRAUJOCH_JFJOCHMESSAGES_H
#include <string>
#include <cstdint>
@@ -13,7 +13,7 @@
constexpr const uint64_t user_data_release = 1;
constexpr const uint64_t user_data_magic_number = 0x52320000UL | user_data_release;
struct CBORImage {
struct CompressedImage {
const uint8_t *data;
size_t size; // Including compression
size_t xpixel;
@@ -27,7 +27,7 @@ struct CBORImage {
struct DataMessage {
int64_t number = INT64_MIN;
CBORImage image;
CompressedImage image;
std::vector<SpotToSave> spots;
std::vector<float> rad_int_profile;
uint64_t indexing_result; // 0 - not tried, 1 - tried and failed, 2 - tried and success
@@ -121,18 +121,18 @@ struct StartMessage {
std::vector<float> rad_int_bin_to_q;
std::vector<float> rad_int_solid_angle_corr;
std::vector<CBORImage> pixel_mask;
std::vector<CBORImage> calibration;
std::vector<CompressedImage> pixel_mask;
std::vector<CompressedImage> calibration;
size_t approx_size = 1024*1024;
// Use function below to update approx_size
void AddPixelMask(CBORImage image) {
void AddPixelMask(CompressedImage image) {
approx_size += image.size;
pixel_mask.emplace_back(std::move(image));
}
void AddCalibration(CBORImage image) {
void AddCalibration(CompressedImage image) {
approx_size += image.size;
calibration.emplace_back(std::move(image));
}
@@ -155,4 +155,4 @@ struct EndMessage {
uint64_t adu_histogram_bin_width;
};
#endif //JUNGFRAUJOCH_CBORMESSAGES_H
#endif //JUNGFRAUJOCH_JFJOCHMESSAGES_H

View File

@@ -2,15 +2,15 @@
#include "TestImagePusher.h"
#include "../tests/FPGAUnitTest.h"
#include "JFJochCompressor.h"
#include "../compression/JFJochCompressor.h"
#include "../compression/JFJochDecompress.h"
#include "../frame_serialize/JFJochFrameDeserializer.h"
#include "CBORStream2Deserializer.h"
TestImagePusher::TestImagePusher(int64_t image_number) {
image_id = image_number;
}
void TestImagePusher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) {
void TestImagePusher::StartDataCollection(const StartMessage& message) {
std::unique_lock<std::mutex> ul(m);
if (is_running)
@@ -28,37 +28,16 @@ void TestImagePusher::EndDataCollection(const EndMessage& message) {
is_running = false;
}
void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
void TestImagePusher::SendImage(const DataMessage& message) {
std::unique_lock<std::mutex> ul(m);
frame_counter++;
if (image_number == image_id) {
JFJochFrameDeserializer deserializer;
deserializer.Process(image_data, image_size);
auto image_array = deserializer.GetDataMessage();
receiver_generated_image.resize(image_array.image.size);
memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size);
if (message.number == image_id) {
receiver_generated_image.resize(message.image.size);
memcpy(receiver_generated_image.data(), message.image.data, message.image.size);
}
}
void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) {
std::unique_lock<std::mutex> ul(m);
frame_counter++;
if (image_number == image_id) {
JFJochFrameDeserializer deserializer;
deserializer.Process(image_data, image_size);
auto image_array = deserializer.GetDataMessage();
receiver_generated_image.resize(image_array.image.size);
memcpy(receiver_generated_image.data(), image_array.image.data, image_array.image.size);
}
z->release();
}
bool TestImagePusher::CheckSequence() const {
std::unique_lock<std::mutex> ul(m);
return correct_sequence;

View File

@@ -6,8 +6,8 @@
#include <mutex>
#include "ImagePusher.h"
#include "Logger.h"
#include "DiffractionExperiment.h"
#include "../common/Logger.h"
#include "../common/DiffractionExperiment.h"
#include "../jungfrau/JFCalibration.h"
#include "../jungfrau/JFModuleGainCalibration.h"
@@ -19,11 +19,9 @@ class TestImagePusher : public ImagePusher {
bool is_running = false;
size_t frame_counter = 0;
public:
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) override;
void SendImage(const DataMessage& message) override;
explicit TestImagePusher(int64_t image_number);
void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) override;
void StartDataCollection(const StartMessage& message) override;
void EndDataCollection(const EndMessage& message) override;
bool CheckImage(const DiffractionExperiment &x,
const std::vector<uint16_t> &raw_reference_image,

View File

@@ -0,0 +1,81 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "ZMQBsreadImagePusher.h"
#include <nlohmann/json.hpp>
#include <openssl/md5.h>
ZMQBsreadImagePusher::ZMQBsreadImagePusher(const std::string &addr, int32_t send_buffer_high_watermark,
int32_t send_buffer_size) : socket(context, ZMQSocketType::Push) {
if (send_buffer_size > 0)
socket.SendBufferSize(send_buffer_size);
if (send_buffer_high_watermark > 0)
socket.SendWaterMark(send_buffer_high_watermark);
socket.Bind(addr);
}
void ZMQBsreadImagePusher::StartDataCollection(const StartMessage &message) {
// Do nothing
}
void ZMQBsreadImagePusher::SendImage(const DataMessage &message) {
std::unique_lock<std::mutex> ul(m);
timespec ts{};
clock_gettime(CLOCK_REALTIME, &ts);
auto data_header = DataHeader(message);
auto main_header = MainHeader(message, data_header, ts);
socket.Send(main_header, true, true);
socket.Send(data_header, true, true);
socket.Send(message.image.data, message.image.size, true, true);
socket.Send(&ts, sizeof(ts), true, false);
}
void ZMQBsreadImagePusher::EndDataCollection(const EndMessage &message) {
// Do nothing
}
std::string ZMQBsreadImagePusher::MainHeader(const DataMessage &message, const std::string &data_header, timespec &ts) {
nlohmann::json j;
j["htype"] = "bsr_m-1.1";
j["pulse_id"] = message.bunch_id;
j["hash"] = MD5Hash(data_header);
j["global_timestamp"]["sec"] = ts.tv_sec;
j["global_timestamp"]["ns"] = ts.tv_nsec;
return j.dump();
}
std::string ZMQBsreadImagePusher::MD5Hash(const std::string &s) {
std::string ret;
uint8_t hash[MD5_DIGEST_LENGTH];
MD5(reinterpret_cast<const unsigned char *>(s.data()), s.length(), hash);
for (unsigned char i : hash) {
char buf[4];
sprintf(buf, "%02x", i);
ret.append(buf);
}
return ret;
}
std::string ZMQBsreadImagePusher::DataHeader(const DataMessage &message) {
nlohmann::json data;
data["htype"] = "bsr_d-1.1";
nlohmann::json image_channel;
image_channel["name"] = "JFJOCH:IMAGE";
if (message.image.pixel_depth_bytes == 2)
image_channel["type"] = (message.image.pixel_is_signed ? "int16" : "uint16");
else
image_channel["type"] = (message.image.pixel_is_signed ? "int32" : "uint32");
image_channel["shape"] = {message.image.ypixel, message.image.xpixel};
image_channel["encoding"] = "little";
if (message.image.algorithm == CompressionAlgorithm::BSHUF_LZ4)
image_channel["compression"] = "bitshuffle_lz4";
else if (message.image.algorithm == CompressionAlgorithm::NO_COMPRESSION)
image_channel["compression"] = "none";
else
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Compression not allowed for BSRead data");
image_channel["modulo"] = 1;
image_channel["offset"] = 0;
data["channels"].push_back(image_channel);
return data.dump();
}

View File

@@ -0,0 +1,24 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_ZMQBSREADIMAGEPUSHER_H
#define JUNGFRAUJOCH_ZMQBSREADIMAGEPUSHER_H
#include "ImagePusher.h"
#include "../common/ZMQWrappers.h"
class ZMQBsreadImagePusher : public ImagePusher {
std::mutex m;
ZMQContext context;
ZMQSocket socket;
static std::string MainHeader(const DataMessage &message, const std::string &data_header, timespec &ts);
static std::string MD5Hash(const std::string& s);
static std::string DataHeader(const DataMessage &message);
public:
ZMQBsreadImagePusher(const std::string& addr, int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
void StartDataCollection(const StartMessage &message) override;
void SendImage(const DataMessage &message) override;
void EndDataCollection(const EndMessage &message) override;
};
#endif //JUNGFRAUJOCH_ZMQBSREADIMAGEPUSHER_H

View File

@@ -0,0 +1,51 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "ZMQPreviewPublisher.h"
#include "CBORStream2Serializer.h"
ZMQPreviewPublisher::ZMQPreviewPublisher(ZMQContext& context, const std::string& addr) :
socket(context, ZMQSocketType::Pub) {
socket.SendWaterMark(2).NoLinger();
socket.Bind(addr);
}
void ZMQPreviewPublisher::SetPreviewStride(int64_t preview_stride) {
std::unique_lock<std::mutex> ul(m);
stride = preview_stride;
}
void ZMQPreviewPublisher::StartDataCollection(const StartMessage& message) {
std::unique_lock<std::mutex> ul(m);
current_part = -1;
size_t approx_size = 1024*1024;
for (const auto &x : message.pixel_mask)
approx_size += x.size;
for (const auto &x : message.calibration)
approx_size += x.size;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceStart(message);
socket.Send(serialization_buffer.data(), serializer.GetBufferSize());
}
void ZMQPreviewPublisher::SendImage(const DataMessage& message) {
{
std::unique_lock<std::mutex> ul(m);
int64_t part = message.number / stride;
if (current_part >= part)
return;
else
current_part = part;
}
size_t approx_size = message.image.size + 2*1024*1024;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeImage(message);
socket.Send(serialization_buffer.data(), serializer.GetBufferSize());
}

View File

@@ -3,10 +3,10 @@
#ifndef JUNGFRAUJOCH_ZMQPREVIEWPUBLISHER_H
#define JUNGFRAUJOCH_ZMQPREVIEWPUBLISHER_H
#include "ZMQWrappers.h"
#include "DiffractionExperiment.h"
#include "../common/ZMQWrappers.h"
#include "../common/DiffractionExperiment.h"
#include "../jungfrau/JFCalibration.h"
#include "../frame_serialize/CBORMessages.h"
#include "JFJochMessages.h"
class ZMQPreviewPublisher {
ZMQSocket socket;
@@ -17,8 +17,10 @@ class ZMQPreviewPublisher {
mutable std::mutex m;
public:
ZMQPreviewPublisher(ZMQContext& context, const std::string& addr);
void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t preview_stride);
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number);
void SetPreviewStride(int64_t preview_stride);
void StartDataCollection(const StartMessage& message);
void SendImage(const DataMessage& message);
};

View File

@@ -1,9 +1,9 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include "ZMQImagePusher.h"
#include "JFJochException.h"
#include "ZMQStream2Pusher.h"
#include "CBORStream2Serializer.h"
ZMQImagePusher::ZMQImagePusher(ZMQContext &zmq_context, const std::vector<std::string> &addr,
ZMQStream2Pusher::ZMQStream2Pusher(ZMQContext &zmq_context, const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
@@ -20,7 +20,7 @@ ZMQImagePusher::ZMQImagePusher(ZMQContext &zmq_context, const std::vector<std::s
}
}
ZMQImagePusher::ZMQImagePusher(const std::vector<std::string> &addr,
ZMQStream2Pusher::ZMQStream2Pusher(const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
@@ -39,35 +39,44 @@ ZMQImagePusher::ZMQImagePusher(const std::vector<std::string> &addr,
}
}
void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
void ZMQStream2Pusher::SendImage(const DataMessage& message) {
if (sockets.empty())
return;
auto socket_number = (image_number % file_count) % sockets.size();
sockets[socket_number]->Send(image_data, image_size);
auto socket_number = (message.number % file_count) % sockets.size();
size_t approx_size = message.image.size + 2*1024*1024;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeImage(message);
sockets[socket_number]->Send(serialization_buffer.data(), serializer.GetBufferSize());
}
void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) {
if (sockets.empty())
return;
auto socket_number = (image_number % file_count) % sockets.size();
sockets[socket_number]->SendZeroCopy(image_data, image_size, z);
}
void ZMQImagePusher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t data_file_count) {
if (data_file_count < 1)
void ZMQStream2Pusher::StartDataCollection(const StartMessage& message) {
if (message.data_file_count < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"File count cannot be zero or negative");
file_count = data_file_count;
file_count = message.data_file_count;
size_t approx_size = 1024*1024;
for (const auto &x : message.pixel_mask)
approx_size += x.size;
for (const auto &x : message.calibration)
approx_size += x.size;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceStart(message);
for (const auto &s: sockets)
s->Send(image_data, image_size, true);
s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
}
void ZMQImagePusher::EndDataCollection(const EndMessage& message) {
void ZMQStream2Pusher::EndDataCollection(const EndMessage& message) {
std::vector<uint8_t> serialization_buffer(80*1024*1024);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
EndMessage end_message = message;

View File

@@ -0,0 +1,28 @@
// Copyright (2019-2023) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
#define JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
#include "ImagePusher.h"
#include "../common/ThreadSafeFIFO.h"
#include "../common/ZMQWrappers.h"
#include "../common/DiffractionSpot.h"
#include "JFJochMessages.h"
class ZMQStream2Pusher : public ImagePusher {
std::vector<std::unique_ptr<ZMQContext>> contexts;
std::vector<std::unique_ptr<ZMQSocket>> sockets;
int64_t file_count = 1;
public:
ZMQStream2Pusher(ZMQContext &context, const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
// High performance implementation, where each socket has dedicated ZMQ context
explicit ZMQStream2Pusher(const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
void StartDataCollection(const StartMessage& message) override;
void SendImage(const DataMessage& message) override;
void EndDataCollection(const EndMessage& message) override;
};
#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H

View File

@@ -3,7 +3,7 @@ ADD_LIBRARY(JFJochReceiver STATIC
JFJochReceiverTest.cpp JFJochReceiverTest.h
JFJochReceiver.cpp JFJochReceiver.h)
TARGET_LINK_LIBRARIES(JFJochReceiver ImageAnalysis JungfraujochAcqusitionDevice CommonFunctions HLSSimulation)
TARGET_LINK_LIBRARIES(JFJochReceiver ImagePusher ImageAnalysis JungfraujochAcqusitionDevice CommonFunctions HLSSimulation)
ADD_EXECUTABLE(jfjoch_action_test jfjoch_action_test.cpp)
TARGET_LINK_LIBRARIES(jfjoch_action_test JFJochReceiver)

View File

@@ -4,7 +4,6 @@
#include <thread>
#include "../jungfrau/JFPedestalCalc.h"
#include "../image_analysis/IndexerWrapper.h"
#include "../common/DiffractionGeometry.h"
@@ -23,7 +22,6 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
AcquisitionDeviceGroup &in_aq_device,
ImagePusher &in_image_sender,
Logger &in_logger, int64_t in_forward_and_sum_nthreads,
int64_t in_send_buffer_count,
ZMQPreviewPublisher* in_preview_publisher,
const NUMAHWPolicy &in_numa_policy) :
experiment(in_experiment),
@@ -36,14 +34,11 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
ndatastreams(experiment.GetDataStreamsNum()),
data_acquisition_ready(ndatastreams),
frame_transformation_ready((experiment.GetImageNum() > 0) ? frame_transformation_nthreads : 0),
send_buffer_count(in_send_buffer_count),
indexing_solution_per_file(experiment.GetDataFileCount()),
numa_policy(in_numa_policy),
adu_histogram_module(experiment.GetModulesNum())
{
send_buffer = (uint8_t *) malloc(send_buffer_size * send_buffer_count);
try {
try {
if (calibration != nullptr) {
one_byte_mask = calibration->CalculateOneByteMask(experiment);
} else {
@@ -51,11 +46,6 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
for (auto &i: one_byte_mask) i = 1;
}
for (uint32_t i = 0; i < send_buffer_count; i++) {
send_buffer_avail.Put(i);
send_buffer_zero_copy_ret_val.emplace_back(send_buffer_avail, i);
}
if (!experiment.CheckGitSha1Consistent())
logger.Warning(experiment.CheckGitSha1Msg());
@@ -118,7 +108,7 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
size_t ypixel = experiment.GetYPixelsNum();
pixel_mask = compressor.Compress(calibration->CalculateNexusMask(experiment, 0));
message.AddPixelMask(CBORImage{
message.AddPixelMask(CompressedImage{
.data = pixel_mask.data(),
.size = pixel_mask.size(),
.xpixel = (size_t) xpixel,
@@ -139,7 +129,7 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
if (experiment.GetStorageCellNumber() > 1)
channel += "_sc" + std::to_string(sc);
CBORImage image{
CompressedImage image{
.data = pedestal.at(pedestal.size() - 1).data(),
.size = pedestal.at(pedestal.size() - 1).size(),
.xpixel = (size_t) xpixel,
@@ -164,17 +154,13 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
} else
message.rad_int_bin_number = 0;
std::vector<uint8_t> serialization_buffer(message.approx_size);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
if (preview_publisher != nullptr) {
preview_publisher->SetPreviewStride(experiment.GetPreviewStride());
preview_publisher->StartDataCollection(message);
}
serializer.SerializeSequenceStart(message);
if (preview_publisher != nullptr)
preview_publisher->StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(),
experiment.GetPreviewStride());
if (push_images_to_writer)
image_pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(),
experiment.GetDataFileCount());
image_pusher.StartDataCollection(message);
for (int i = 0; i < experiment.GetImageNum(); i++)
images_to_go.Put(i);
@@ -200,7 +186,6 @@ JFJochReceiver::JFJochReceiver(const DiffractionExperiment& in_experiment,
measurement = std::async(std::launch::async, &JFJochReceiver::FinalizeMeasurement, this);
} catch (...) {
free(send_buffer);
throw;
}
}
@@ -403,28 +388,15 @@ void JFJochReceiver::FrameTransformationThread() {
*rad_int_profile_per_file[image_number % experiment.GetDataFileCount()] += *rad_int_profile_image;
}
message.receiver_available_send_buffers = GetAvailableSendBuffers();
auto send_buffer_handle = send_buffer_avail.GetBlocking();
auto ptr = send_buffer + send_buffer_size * send_buffer_handle;
JFJochFrameSerializer serializer(ptr, send_buffer_size);
PrepareCBORImage(message, experiment, nullptr, 0);
serializer.SerializeImage(message);
if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Not enough memory to save image");
size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset());
serializer.AppendImage(image_size);
message.image = transformation.GetCompressedImage();
if (preview_publisher && (!local_data_processing_settings.preview_indexed_only || indexed))
preview_publisher->SendImage(ptr, serializer.GetBufferSize(), image_number);
preview_publisher->SendImage(message);
if (push_images_to_writer) {
image_pusher.SendImage(ptr, serializer.GetBufferSize(), image_number,
&send_buffer_zero_copy_ret_val[send_buffer_handle]);
compressed_size += image_size;
} else
send_buffer_avail.Put(send_buffer_handle);
image_pusher.SendImage(message);
compressed_size += message.image.size;
}
UpdateMaxImage(image_number);
images_sent++;
@@ -568,9 +540,6 @@ void JFJochReceiver::FinalizeMeasurement() {
for (auto &future : data_acquisition_futures)
future.get();
for (int i = 0; i < send_buffer_count; i++)
send_buffer_avail.GetBlocking();
RetrievePedestal();
logger.Info("Devices stopped");
@@ -597,7 +566,6 @@ void JFJochReceiver::StopReceiver() {
JFJochReceiver::~JFJochReceiver() {
if (measurement.valid())
measurement.get();
free(send_buffer);
}
DataProcessingSettings JFJochReceiver::GetDataProcessingSettings() {
@@ -627,7 +595,6 @@ Plot JFJochReceiver::GetPlots(const PlotRequest &request) {
return indexing_solution_per_file.GetPlot();
case PlotType::ADUHistorgram:
return adu_histogram_total.GetPlot();
break;
default:
// Do nothing
break;
@@ -665,14 +632,10 @@ void JFJochReceiver::UpdateMaxDelay(int64_t delay) {
max_delay = delay;
}
float JFJochReceiver::GetAvailableSendBuffers() const {
return static_cast<float>(send_buffer_avail.Size()) / static_cast<float>(send_buffer_count);
}
JFJochReceiverStatus JFJochReceiver::GetStatus() const {
return {
.progress = GetProgress(),
.indexing_rate = GetIndexingRate(),
.send_buffers_avail = GetAvailableSendBuffers()
.send_buffers_avail = 0
};
}

View File

@@ -15,10 +15,10 @@
#include "../common/DiffractionExperiment.h"
#include "../common/JFJochException.h"
#include "../common/FrameTransformation.h"
#include "../common/ImagePusher.h"
#include "../frame_serialize/ImagePusher.h"
#include "../common/Logger.h"
#include "../common/ThreadSafeFIFO.h"
#include "../common/ZMQPreviewPublisher.h"
#include "../frame_serialize/ZMQPreviewPublisher.h"
#include "../common/NUMAHWPolicy.h"
#include "../common/StatusVector.h"
#include "../common/Histogram.h"
@@ -118,14 +118,6 @@ class JFJochReceiver {
int64_t max_image_number_sent = 0;
std::mutex max_image_number_sent_mutex;
const size_t send_buffer_size = experiment.GetMaxCompressedSize() + 1024*1024;
// max compressed size + 1 MiB reserve for spots and rad. integration results
const size_t send_buffer_count;
ThreadSafeFIFO<uint32_t> send_buffer_avail;
uint8_t *send_buffer;
std::vector<ZeroCopyReturnValue> send_buffer_zero_copy_ret_val;
NUMAHWPolicy numa_policy;
void AcquireThread(uint16_t data_stream);
@@ -142,7 +134,6 @@ public:
AcquisitionDeviceGroup &acquisition_devices,
ImagePusher &image_pusher,
Logger &logger, int64_t forward_and_sum_nthreads,
int64_t send_buffer_count,
ZMQPreviewPublisher* preview_publisher,
const NUMAHWPolicy &numa_policy);
~JFJochReceiver();
@@ -160,7 +151,6 @@ public:
void SetDataProcessingSettings(const DataProcessingSettings &data_processing_settings);
float GetAvailableSendBuffers() const;
Plot GetPlots(const PlotRequest& request);
RadialIntegrationProfiles GetRadialIntegrationProfiles();
};

View File

@@ -72,8 +72,7 @@ void JFJochReceiverService::Start(const DiffractionExperiment &experiment, const
receiver = std::make_unique<JFJochReceiver>(experiment, calibration,
aq_devices, image_pusher,
logger, nthreads, send_buffer_count,
preview_publisher, numa_policy);
logger, nthreads, preview_publisher, numa_policy);
try {
// Don't want to stop
receiver->SetDataProcessingSettings(data_processing_settings);

View File

@@ -2,8 +2,8 @@
#include "JFJochReceiverTest.h"
#include "JFJochReceiverService.h"
#include "../common/ZMQImagePusher.h"
#include "../common/TestImagePusher.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../frame_serialize/TestImagePusher.h"
#define STORAGE_CELL_FOR_TEST 11

View File

@@ -2,15 +2,15 @@
#include <catch2/catch.hpp>
#include "../frame_serialize/JFJochFrameSerializer.h"
#include "../frame_serialize/JFJochFrameDeserializer.h"
#include "../frame_serialize/CBORStream2Serializer.h"
#include "../frame_serialize/CBORStream2Deserializer.h"
#include "../compression/JFJochCompressor.h"
#include "stream2.h"
#include "../frame_serialize/CborUtil.h"
TEST_CASE("CBORSerialize_Start", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
StartMessage message {
.data_file_count = 3,
@@ -65,9 +65,9 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") {
REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::START);
StartMessage output_message;
REQUIRE_NOTHROW(output_message = deserializer.GetStartMessage());
@@ -127,11 +127,11 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") {
TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<uint32_t> mask(456*457, 15);
CBORImage image_mask {
CompressedImage image_mask {
.data = reinterpret_cast<uint8_t *>(mask.data()),
.size = 456 * 457 * sizeof(uint32_t),
.xpixel = 456,
@@ -148,9 +148,9 @@ TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") {
message.AddPixelMask(image_mask);
REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::START);
StartMessage output_message;
REQUIRE_NOTHROW(output_message = deserializer.GetStartMessage());
@@ -164,7 +164,7 @@ TEST_CASE("CBORSerialize_Start_PixelMask", "[CBOR]") {
TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<float> calib1(256);
std::vector<float> calib2(256);
@@ -174,7 +174,7 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") {
calib2[i] = i * 76.33456;
}
CBORImage image1 {
CompressedImage image1 {
.data = reinterpret_cast<uint8_t *>(calib1.data()),
.size = 16 * 16 * sizeof(float),
.xpixel = 16,
@@ -186,7 +186,7 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") {
.channel = "calib1"
};
CBORImage image2 {
CompressedImage image2 {
.data = reinterpret_cast<uint8_t *>(calib2.data()),
.size = 16 * 16 * sizeof(float),
.xpixel = 16,
@@ -204,9 +204,9 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") {
message.AddCalibration(image1);
REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::START);
StartMessage output_message;
REQUIRE_NOTHROW(output_message = deserializer.GetStartMessage());
@@ -225,7 +225,7 @@ TEST_CASE("CBORSerialize_Start_Calibration", "[CBOR]") {
TEST_CASE("CBORSerialize_End", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
EndMessage message {
.number_of_images = 57789,
@@ -239,9 +239,9 @@ TEST_CASE("CBORSerialize_End", "[CBOR]") {
REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::END);
EndMessage output_message{};
REQUIRE_NOTHROW(output_message = deserializer.GetEndMessage());
@@ -257,7 +257,7 @@ TEST_CASE("CBORSerialize_End", "[CBOR]") {
TEST_CASE("CBORSerialize_End_RadIntResult", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
EndMessage message {
.number_of_images = 57789,
@@ -274,9 +274,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::END);
EndMessage output_message{};
REQUIRE_NOTHROW(output_message = deserializer.GetEndMessage());
@@ -289,7 +289,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
TEST_CASE("CBORSerialize_End_ADUHistogram", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
EndMessage message {
.number_of_images = 57789,
@@ -307,9 +307,9 @@ TEST_CASE("CBORSerialize_End_ADUHistogram", "[CBOR]") {
REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::END);
EndMessage output_message{};
REQUIRE_NOTHROW(output_message = deserializer.GetEndMessage());
@@ -324,7 +324,7 @@ TEST_CASE("CBORSerialize_End_ADUHistogram", "[CBOR]") {
TEST_CASE("CBORSerialize_Image", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
@@ -332,7 +332,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image {
CompressedImage image {
.data = test.data(),
.size = 1024,
.xpixel = 256,
@@ -362,9 +362,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE_NOTHROW(serializer.SerializeImage(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION);
@@ -392,7 +392,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
TEST_CASE("CBORSerialize_Image_2", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
@@ -400,7 +400,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image {
CompressedImage image {
.data = test.data(),
.size = 1024 * 512,
.xpixel = 1024,
@@ -421,9 +421,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE_NOTHROW(serializer.SerializeImage(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION);
@@ -442,7 +442,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
@@ -450,7 +450,7 @@ TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") {
for (int i = 0; i < test.size(); i++)
test[i] = i * 0.1f;
CBORImage image {
CompressedImage image {
.data = reinterpret_cast<uint8_t *>(test.data()),
.size = 1024 * 512 * sizeof(float),
.xpixel = 1024,
@@ -471,9 +471,9 @@ TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") {
REQUIRE_NOTHROW(serializer.SerializeImage(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION);
@@ -490,60 +490,9 @@ TEST_CASE("CBORSerialize_Image_Float", "[CBOR]") {
REQUIRE(memcmp(image_array.image.data, test.data(), test.size() * sizeof(float)) == 0);
}
TEST_CASE("CBORSerialize_Image_Append", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
std::vector<uint8_t> test(512*1024);
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image {
.data = nullptr,
.size = 0,
.xpixel = 1024,
.ypixel = 512,
.pixel_depth_bytes = 1,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::NO_COMPRESSION,
.channel = "default"
};
DataMessage message {
.number = 480,
.image = image,
.spots = spots,
.indexing_result = 3
};
REQUIRE_NOTHROW(serializer.SerializeImage(message));
memcpy(buffer.data() + serializer.GetImageAppendOffset(), test.data(), 512*1024);
REQUIRE_NOTHROW(serializer.AppendImage(512*1024));
JFJochFrameDeserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION);
REQUIRE(image_array.image.xpixel == 1024);
REQUIRE(image_array.image.ypixel == 512);
REQUIRE(image_array.image.pixel_depth_bytes == 1);
REQUIRE(!image_array.image.pixel_is_signed);
REQUIRE(!image_array.image.pixel_is_float);
REQUIRE(image_array.image.channel == "default");
REQUIRE(image_array.image.size == test.size());
REQUIRE(image_array.indexing_result == message.indexing_result);
REQUIRE(image_array.number == 480);
REQUIRE(memcmp(image_array.image.data, test.data(), test.size()) == 0);
}
TEST_CASE("CBORSerialize_Image_Compressed", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
@@ -551,7 +500,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image {
CompressedImage image {
.data = test.data(),
.size = 512,
.xpixel = 256,
@@ -571,9 +520,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE_NOTHROW(serializer.SerializeImage(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.image.algorithm == CompressionAlgorithm::BSHUF_LZ4);
@@ -590,13 +539,13 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") {
std::vector<uint8_t> buffer(8 * 1024 * 1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<uint8_t> test(1024);
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image{
CompressedImage image{
.data = test.data(),
.size = 1024,
.xpixel = 256,
@@ -614,9 +563,9 @@ TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") {
REQUIRE_NOTHROW(serializer.SerializeImage(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.number == 789);
@@ -628,7 +577,7 @@ TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") {
TEST_CASE("CBORSerialize_Image_Spots", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
spots.push_back(SpotToSave{.x = 7, .y = 8, .intensity = 34, .indexed = false});
@@ -638,7 +587,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image {
CompressedImage image {
.data = test.data(),
.size = 1024,
.xpixel = 256,
@@ -656,9 +605,9 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
REQUIRE_NOTHROW(serializer.SerializeImage(message));
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
REQUIRE_NOTHROW(deserializer.Process(buffer.data(), serializer.GetBufferSize()));
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
REQUIRE(deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE);
auto image_array = deserializer.GetDataMessage();
REQUIRE(image_array.number == 789);
@@ -683,11 +632,11 @@ inline bool CmpString(const char *str1, const std::string& str2) {
TEST_CASE("CBORSerialize_Start_stream2", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<uint32_t> mask(456*457, 15);
CBORImage image_mask {
CompressedImage image_mask {
.data = reinterpret_cast<uint8_t *>(mask.data()),
.size = 456 * 457 * sizeof(uint32_t),
.xpixel = 456,
@@ -783,7 +732,7 @@ TEST_CASE("CBORSerialize_Start_stream2", "[CBOR]") {
TEST_CASE("CBORSerialize_End_stream2", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
EndMessage message {
.number_of_images = 57789,
@@ -813,7 +762,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
TEST_CASE("CBORSerialize_Image_compressed_stream2", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
@@ -826,7 +775,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
size_t compressed_size = compressor.Compress(compressed_test.data(), test);
CBORImage image {
CompressedImage image {
.data = compressed_test.data(),
.size = compressed_size,
.xpixel = 1024,
@@ -870,7 +819,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
TEST_CASE("CBORSerialize_Image_uncompressed_stream2", "[CBOR]") {
std::vector<uint8_t> buffer(8*1024*1024);
JFJochFrameSerializer serializer(buffer.data(), buffer.size());
CBORStream2Serializer serializer(buffer.data(), buffer.size());
std::vector<SpotToSave> spots;
@@ -878,7 +827,7 @@ JFJochFrameSerializer serializer(buffer.data(), buffer.size());
for (int i = 0; i < test.size(); i++)
test[i] = (i * 253 + 56) % 256;
CBORImage image {
CompressedImage image {
.data = test.data(),
.size = 1024 * 512,
.xpixel = 1024,

View File

@@ -10,6 +10,16 @@
using namespace std::literals::chrono_literals;
inline uint32_t read_be32(const void *ptr) {
auto ptr32 = (uint32_t *) ptr;
return __builtin_bswap32(ptr32[0]);
}
inline uint64_t read_be64(const void *ptr) {
auto ptr64 = (uint64_t *) ptr;
return __builtin_bswap64(ptr64[0]);
}
TEST_CASE("Bshuf_SSE", "[bitshuffle]") {
REQUIRE (bshuf_using_SSE2() == 1);
}
@@ -36,14 +46,13 @@ TEST_CASE("FrameTransformation_Raw_NoCompression" ,"") {
for (int i = 0; i < nmodules*RAW_MODULE_SIZE; i++)
input_1[i] = dist(g1);
std::vector<int16_t> output(experiment.GetPixelsNum());
for (int i = 0; i < nmodules; i++) {
REQUIRE_NOTHROW(transformation.ProcessModule(input_0.data() + i * RAW_MODULE_SIZE, i, 0));
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
REQUIRE(transformation.SaveCompressedImage(output.data()) == experiment.GetPixelDepth() * experiment.GetPixelsNum());
auto image = transformation.GetCompressedImage();
REQUIRE(image.size == experiment.GetPixelDepth() * experiment.GetPixelsNum());
auto output = (int16_t *) image.data;
uint32_t diff_0 = 0;
uint32_t diff_1 = 0;
@@ -76,14 +85,14 @@ TEST_CASE("FrameTransformation_Converted_NoCompression" ,"") {
for (int i = 0; i < nmodules*RAW_MODULE_SIZE; i++)
input_1[i] = dist(g1);
std::vector<int16_t> output(experiment.GetPixelsNum());
for (int i = 0; i < nmodules; i++) {
REQUIRE_NOTHROW(transformation.ProcessModule(input_0.data() + i * RAW_MODULE_SIZE, i, 0));
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
REQUIRE(transformation.SaveCompressedImage(output.data()) == experiment.GetPixelDepth() * experiment.GetPixelsNum());
auto image = transformation.GetCompressedImage();
REQUIRE(image.size == experiment.GetPixelDepth() * experiment.GetPixelsNum());
auto output = (int16_t *) image.data;
REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]);
REQUIRE(input_0[511*1024+256]/2 == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 258]);
@@ -134,15 +143,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_lz4" ,"") {
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
size_t compressed_size;
REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data()));
output_compressed.resize(compressed_size);
auto image = transformation.GetCompressedImage();
REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
std::vector<int16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed,
std::vector<uint16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size,
experiment.GetPixelsNum()));
REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]);
@@ -193,15 +200,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd" ,"") {
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
size_t compressed_size;
REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data()));
auto image = transformation.GetCompressedImage();
REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
output_compressed.resize(compressed_size);
std::vector<int16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed,
std::vector<uint16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size,
experiment.GetPixelsNum()));
REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]);
@@ -252,15 +257,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd_rle" ,"") {
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
size_t compressed_size;
REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data()));
auto image = transformation.GetCompressedImage();
REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
output_compressed.resize(compressed_size);
std::vector<int16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed,
std::vector<uint16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size,
experiment.GetPixelsNum()));
REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]);
@@ -314,15 +317,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd_32bit" ,"") {
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
size_t compressed_size;
REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data()));
auto image = transformation.GetCompressedImage();
REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
output_compressed.resize(compressed_size);
std::vector<int32_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed,
std::vector<uint32_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size,
experiment.GetPixelsNum()));
REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]);
@@ -381,15 +382,13 @@ TEST_CASE("FrameTransformation_Converted_bshuf_zstd_unsigned_16bit" ,"") {
REQUIRE_NOTHROW(transformation.ProcessModule(input_1.data() + i * RAW_MODULE_SIZE, i, 1));
}
size_t compressed_size;
REQUIRE_NOTHROW(compressed_size = transformation.SaveCompressedImage(output_compressed.data()));
auto image = transformation.GetCompressedImage();
REQUIRE(bshuf_read_uint64_BE(output_compressed.data()) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(bshuf_read_uint32_BE(output_compressed.data()+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
REQUIRE(read_be64(image.data) == experiment.GetPixelsNum() * experiment.GetPixelDepth());
REQUIRE(read_be32(image.data+8) == JFJochBitShuffleCompressor::DefaultBlockSize * experiment.GetPixelDepth());
output_compressed.resize(compressed_size);
std::vector<uint16_t> output;
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), output_compressed,
REQUIRE_NOTHROW(JFJochDecompress(output, experiment.GetCompressionAlgorithm(), image.data, image.size,
experiment.GetPixelsNum()));
REQUIRE(input_0[511*1024] == output[CONVERTED_MODULE_SIZE * (2 * nmodules - 2) + 0]);

View File

@@ -5,7 +5,7 @@
#include "../receiver/JFJochReceiverTest.h"
#include "../acquisition_device/HLSSimulatedDevice.h"
#include "../jungfrau/JFPedestalCalc.h"
#include "../common/TestImagePusher.h"
#include "../frame_serialize/TestImagePusher.h"
using namespace std::literals::chrono_literals;

View File

@@ -4,7 +4,7 @@
#include <filesystem>
#include "../writer/StreamWriter.h"
#include "../common/ZMQImagePusher.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../receiver/JFJochReceiverService.h"
TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
@@ -23,7 +23,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
for (int i = 0; i < x.GetDataStreamsNum(); i++)
aq_devices.AddHLSDevice(64);
ZMQImagePusher pusher (context, {zmq_addr});
ZMQStream2Pusher pusher (context, {zmq_addr});
JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher);
JFJochReceiverOutput receiver_output;

View File

@@ -2,7 +2,7 @@
#include <catch2/catch.hpp>
#include "../writer/ZMQImagePuller.h"
#include "../common/ZMQImagePusher.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
void test_puller(ZMQImagePuller *puller,
const DiffractionExperiment& x,
@@ -15,13 +15,13 @@ void test_puller(ZMQImagePuller *puller,
std::vector<size_t> &nimages) {
puller->WaitForImage();
if (puller->GetFrameType() != JFJochFrameDeserializer::Type::START) {
if (puller->GetFrameType() != CBORStream2Deserializer::Type::START) {
diff_content[writer_id]++;
return;
}
puller->WaitForImage();
while (puller->GetFrameType() != JFJochFrameDeserializer::Type::END) {
if (puller->GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) {
while (puller->GetFrameType() != CBORStream2Deserializer::Type::END) {
if (puller->GetFrameType() == CBORStream2Deserializer::Type::IMAGE) {
auto image = puller->GetDataMessage();
if ((nwriter > 1) && (image.number % nwriter != writer_id))
diff_split[writer_id]++;
@@ -76,15 +76,13 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
ZMQImagePuller puller(context);
ZMQImagePusher pusher(context, {zmq_addr});
ZMQStream2Pusher pusher(context, {zmq_addr});
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
puller.Connect(zmq_addr);
std::thread sender_thread = std::thread([&] {
std::vector<uint8_t> serialization_buffer(16*1024*1024);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
StartMessage message {
.data_file_count = 16
};
@@ -92,14 +90,12 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
.write_master_file = true
};
serializer.SerializeSequenceStart(message);
pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), message.data_file_count);
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
serializer.SerializeImage(data_message);
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
pusher.SendImage(data_message);
}
pusher.EndDataCollection(end_message);
@@ -148,7 +144,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("inproc://#" + std::to_string(i));
ZMQImagePusher pusher(context, zmq_addr);
ZMQStream2Pusher pusher(context, zmq_addr);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
@@ -161,23 +157,19 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
std::vector<size_t> diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers);
std::thread sender_thread = std::thread([&] {
std::vector<uint8_t> serialization_buffer(16*1024*1024);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
StartMessage message {
.data_file_count = 16
};
EndMessage end_message{
.write_master_file = true
};
serializer.SerializeSequenceStart(message);
pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), message.data_file_count);
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
serializer.SerializeImage(data_message);
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
pusher.SendImage(data_message);
}
pusher.EndDataCollection(end_message);
@@ -238,7 +230,7 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("inproc://#" + std::to_string(i));
ZMQImagePusher pusher(context, zmq_addr);
ZMQStream2Pusher pusher(context, zmq_addr);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
@@ -252,23 +244,19 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
std::vector<size_t> diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers);
std::thread sender_thread = std::thread([&] {
std::vector<uint8_t> serialization_buffer(16*1024*1024);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
StartMessage message {
StartMessage message {
.data_file_count = 16
};
EndMessage end_message{
.write_master_file = true
};
serializer.SerializeSequenceStart(message);
pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(), message.data_file_count);
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
serializer.SerializeImage(data_message);
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
pusher.SendImage(data_message);
}
pusher.EndDataCollection(end_message);

View File

@@ -1,8 +1,9 @@
// Copyright (2019-2023) Paul Scherrer Institute
#include <catch2/catch.hpp>
#include "../common/ZMQPreviewPublisher.h"
#include "../frame_serialize/ZMQPreviewPublisher.h"
/*
TEST_CASE("ZMQPreviewPublisher","[ZMQ]") {
ZMQContext context;
ZMQPreviewPublisher publisher(context, "inproc://#5");
@@ -75,3 +76,4 @@ TEST_CASE("ZMQPreviewPublisher_FrameNumbers","[ZMQ]") {
REQUIRE(socket.Receive(s, false) < 0);
}
*/

View File

@@ -23,7 +23,8 @@ double CheckCompressionThread(const DiffractionExperiment &x,
for (int j = 0; j < x.GetModulesNum(); j++ ) {
transformation.ProcessModule(image.data() + (j + i * x.GetModulesNum()) * RAW_MODULE_SIZE, j, 0);
}
ret += transformation.SaveCompressedImage(output.data() + i * x.GetMaxCompressedSize());
auto image = transformation.GetCompressedImage();
ret += image.size;
}
return ret;
}
@@ -69,8 +70,10 @@ std::string CheckDecompression(const DiffractionExperiment &x, size_t nimages, c
for (int j = 0; j < x.GetModulesNum(); j++ ) {
transformation.ProcessModule(image.data() + (j + i * x.GetModulesNum()) * RAW_MODULE_SIZE, j, 0);
}
compressed_size[i] = transformation.SaveCompressedImage(output[i].data());
output[i].resize(compressed_size[i]);
auto image = transformation.GetCompressedImage();
compressed_size[i] = image.size;
output[i].resize(image.size);
memcpy(output[i].data(), image.data, image.size);
}
std::vector<uint16_t> decompress_v;

View File

@@ -88,7 +88,12 @@ int main(int argc, char **argv) {
for (int i = 0; i < nimages; i++) {
for (int j = 0; j < 8; j++)
transformation.ProcessModule(image + (i * x.GetModulesNum() + j) * RAW_MODULE_SIZE, j, 0);
output_size[i] = transformation.SaveCompressedImage(output[i].data());
auto image = transformation.GetCompressedImage();
output_size[i] = image.size;
output[i].resize(image.size);
memcpy(output[i].data(), image.data, image.size);
}
x.ImagesPerTrigger(nimages_out);
@@ -102,7 +107,7 @@ int main(int argc, char **argv) {
{
size_t xpixel = x.GetXPixelsNum();
size_t ypixel = x.GetYPixelsNum();
start_message.AddPixelMask(CBORImage{
start_message.AddPixelMask(CompressedImage{
.data = reinterpret_cast<uint8_t *>(pixel_mask.data()),
.size = pixel_mask.size() * sizeof(uint32_t),
.xpixel = xpixel,

View File

@@ -6,7 +6,7 @@
#include "../common/Logger.h"
#include "../common/FrameTransformation.h"
#include "../common/RawToConvertedGeometry.h"
#include "../common/ZMQImagePusher.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#define BASE_TCP_PORT 8000
@@ -56,7 +56,7 @@ int main(int argc, char **argv) {
x.DataFileCount(nsockets);
ZMQImagePusher pusher(context, zmq_addr);
ZMQStream2Pusher pusher(context, zmq_addr);
FrameTransformation transformation(x);
@@ -78,28 +78,28 @@ int main(int argc, char **argv) {
for (int j = 0; j < x.GetModulesNum(); j++)
transformation.ProcessModule(image_tmp_raw.data() + j * RAW_MODULE_SIZE, j, 0);
output_size[i] = transformation.SaveCompressedImage((uint8_t *) output[i].data());
auto image = transformation.GetCompressedImage();
output_size[i] = image.size;
output[i].resize(image.size);
memcpy(output[i].data(), image.data, image.size);
}
logger.Info("Sending {} images", nimages_out);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<uint8_t> send_buffer(x.GetPixelsNum() * x.GetPixelDepth() * 2);
JFJochFrameSerializer serializer(send_buffer.data(), send_buffer.size());
StartMessage start_message;
x.FillMessage(start_message);
serializer.SerializeSequenceStart(start_message);
pusher.StartDataCollection(send_buffer.data(), serializer.GetBufferSize(), x.GetDataFileCount());
pusher.StartDataCollection(start_message);
for (int i = 0; i < nimages_out; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, output[i % nimages_in_file].data(), output_size[i % nimages_in_file]);
serializer.SerializeImage(data_message);
pusher.SendImage(send_buffer.data(), serializer.GetBufferSize(), i);
pusher.SendImage(data_message);
}
EndMessage end_message{};

View File

@@ -13,7 +13,7 @@ ADD_LIBRARY(JFJochWriter STATIC
ZMQImagePuller.cpp ZMQImagePuller.h
StreamWriter.cpp StreamWriter.h)
TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions)
TARGET_LINK_LIBRARIES(JFJochWriter HDF5Wrappers CommonFunctions CBORStream2FrameSerialize)
TARGET_LINK_LIBRARIES(jfjoch_writer JFJochWriter)

View File

@@ -9,7 +9,7 @@
#include "HDF5Objects.h"
#include "../common/SpotToSave.h"
#include "../frame_serialize/CBORMessages.h"
#include "../frame_serialize/JFJochMessages.h"
struct HDF5DataFileStatistics {
std::string filename;

View File

@@ -194,7 +194,7 @@ void HDF5Metadata::DetectorModule(HDF5File *hdf5_file, const std::string &name,
"", "", "translation", {0,0,0});
}
void HDF5Metadata::SaveCBORImage(HDF5File *hdf5_file, const std::string &hdf5_path, const CBORImage &image) {
void HDF5Metadata::SaveCBORImage(HDF5File *hdf5_file, const std::string &hdf5_path, const CompressedImage &image) {
std::vector<hsize_t> dims = {image.ypixel, image.xpixel};
HDF5DataType data_type(image.pixel_depth_bytes, image.pixel_is_signed);

View File

@@ -3,7 +3,7 @@
#ifndef JUNGFRAUJOCH_HDF5NXMX_H
#define JUNGFRAUJOCH_HDF5NXMX_H
#include "../frame_serialize/CBORMessages.h"
#include "../frame_serialize/JFJochMessages.h"
#include "HDF5Objects.h"
@@ -25,7 +25,7 @@ namespace HDF5Metadata {
void Mask(HDF5File *hdf5_file, const StartMessage &start, const EndMessage &end);
void Calibration(HDF5File *hdf5_file, const StartMessage &start, const EndMessage &end);
void SaveCBORImage(HDF5File *hdf5_file, const std::string& hdf5_path, const CBORImage &image);
void SaveCBORImage(HDF5File *hdf5_file, const std::string& hdf5_path, const CompressedImage &image);
void RadInt(HDF5File *hdf5_file, const StartMessage &start, const EndMessage &end);
void ADUHistogram(HDF5File *hdf5_file, const EndMessage &end);
std::string DataFileName(const std::string& prefix, int64_t file_number);

View File

@@ -6,7 +6,7 @@
#include <numeric>
#include "HDF5DataFile.h"
#include "../frame_serialize/CBORMessages.h"
#include "../frame_serialize/JFJochMessages.h"
class HDF5Writer {
std::vector<std::unique_ptr<HDF5DataFile> > files;

View File

@@ -14,7 +14,7 @@ StreamWriter::StreamWriter(ZMQContext &context, Logger &in_logger, const std::st
void StreamWriter::StartDataCollection() {
image_puller.WaitForImage();
while (image_puller.GetFrameType() != JFJochFrameDeserializer::Type::START) {
while (image_puller.GetFrameType() != CBORStream2Deserializer::Type::START) {
logger.Error("Expected START image");
image_puller.WaitForImage();
}
@@ -28,7 +28,7 @@ void StreamWriter::StartDataCollection() {
void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
HDF5Writer writer(start_message);
image_puller.WaitForImage();
while (image_puller.GetFrameType() == JFJochFrameDeserializer::Type::IMAGE) {
while (image_puller.GetFrameType() == CBORStream2Deserializer::Type::IMAGE) {
auto image_array = image_puller.GetDataMessage();
writer.Write(image_array);
image_puller.WaitForImage();
@@ -37,7 +37,7 @@ void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
}
void StreamWriter::EndDataCollection() {
while (image_puller.GetFrameType() != JFJochFrameDeserializer::Type::END) {
while (image_puller.GetFrameType() != CBORStream2Deserializer::Type::END) {
logger.Error("Expected END image");
image_puller.WaitForImage();
}

View File

@@ -36,14 +36,14 @@ void ZMQImagePuller::WaitForImage() {
if (!abort) {
deserializer.Process(zmq_recv_buffer);
if (deserializer.GetType() == JFJochFrameDeserializer::Type::START) {
if (deserializer.GetType() == CBORStream2Deserializer::Type::START) {
start_time = std::chrono::system_clock::now();
start_message = std::make_unique<StartMessage>(deserializer.GetStartMessage());
end_message.reset();
} else if (deserializer.GetType() == JFJochFrameDeserializer::Type::END) {
} else if (deserializer.GetType() == CBORStream2Deserializer::Type::END) {
end_message = std::make_unique<EndMessage>(deserializer.GetEndMessage());
end_time = std::chrono::system_clock::now();
} else if (deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE) {
} else if (deserializer.GetType() == CBORStream2Deserializer::Type::IMAGE) {
processed_images++;
deserialized_image_message = std::make_unique<DataMessage>(deserializer.GetDataMessage());
processed_size += deserialized_image_message->image.size;
@@ -58,9 +58,9 @@ const DataMessage &ZMQImagePuller::GetDataMessage() const {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far");
}
JFJochFrameDeserializer::Type ZMQImagePuller::GetFrameType() const {
CBORStream2Deserializer::Type ZMQImagePuller::GetFrameType() const {
if (abort)
return JFJochFrameDeserializer::Type::END;
return CBORStream2Deserializer::Type::END;
else
return deserializer.GetType();
}

View File

@@ -8,7 +8,7 @@
#include "../common/ZMQWrappers.h"
#include "../common/Logger.h"
#include "../common/SpotToSave.h"
#include "../frame_serialize/JFJochFrameDeserializer.h"
#include "../frame_serialize/CBORStream2Deserializer.h"
struct ZMQImagePullerStatistics {
uint64_t processed_images;
@@ -19,7 +19,7 @@ struct ZMQImagePullerStatistics {
class ZMQImagePuller {
std::vector<uint8_t> zmq_recv_buffer;
JFJochFrameDeserializer deserializer;
CBORStream2Deserializer deserializer;
constexpr const static uint32_t ReceiverWaterMark = 100;
// ZeroMQ receive timeout allows to check for abort value from time to time
@@ -43,7 +43,7 @@ public:
void WaitForImage();
const DataMessage &GetDataMessage() const;
[[nodiscard]] JFJochFrameDeserializer::Type GetFrameType() const;
[[nodiscard]] CBORStream2Deserializer::Type GetFrameType() const;
[[nodiscard]] ZMQImagePullerStatistics GetStatistics();
[[nodiscard]] StartMessage GetStartMessage() const;
[[nodiscard]] EndMessage GetEndMessage() const;