1.0.0-rc2: Fixes in preview

This commit is contained in:
2024-05-16 17:41:47 +02:00
parent 2b945f6dfa
commit 949f693311
29 changed files with 295 additions and 265 deletions
+1 -1
View File
@@ -1 +1 @@
1.0.0-rc.1
1.0.0-rc.2
+9 -5
View File
@@ -2,7 +2,7 @@
// Using OpenAPI licensed with Apache License 2.0
#include <vector>
#include <signal.h>
#include <csignal>
#include <fstream>
#include <nlohmann/json.hpp>
@@ -12,7 +12,7 @@
#include "JFJochBrokerHttp.h"
#include "JFJochBrokerParser.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../frame_serialize/DumpCBORToFilePusher.h"
static Pistache::Http::Endpoint *httpEndpoint;
@@ -73,8 +73,6 @@ int main (int argc, char **argv) {
std::unique_ptr<JFJochReceiverService> receiver;
std::unique_ptr<ImagePusher> image_pusher;
ZMQContext context;
DiffractionExperiment experiment;
experiment.MaskChipEdges(true).MaskModuleEdges(true);
@@ -88,9 +86,15 @@ int main (int argc, char **argv) {
int32_t zmq_send_watermark = ParseInt32(input, "zmq_send_watermark", 100);
int32_t zmq_send_buffer_size = ParseInt32(input, "zmq_send_buffer_size", -1);
image_pusher = std::make_unique<ZMQStream2PusherGroup>(ParseStringArray(input, "zmq_image_addr"),
auto tmp = std::make_unique<ZMQStream2Pusher>(ParseStringArray(input, "zmq_image_addr"),
zmq_send_watermark,
zmq_send_buffer_size);
std::string preview_addr = ParseString(input, "zmq_preview_addr", "");
if (!preview_addr.empty())
tmp->PreviewSocket(preview_addr);
image_pusher = std::move(tmp);
} else if (pusher_type == "dump_cbor") {
image_pusher = std::make_unique<DumpCBORToFilePusher>();
} else
+2 -3
View File
@@ -8,8 +8,7 @@ ZMQContext::ZMQContext() {
// Default is to have 2 I/O threads per ZMQ context
if (zmq_ctx_set(context, ZMQ_IO_THREADS, 2) != 0)
throw JFJochException(JFJochExceptionCategory::ZeroMQ,
"Cannot set number of I/O threads");
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads");
}
ZMQContext &ZMQContext::NumThreads(int32_t threads) {
@@ -27,7 +26,7 @@ void *ZMQContext::GetContext() const {
return context;
}
ZMQSocket::ZMQSocket(ZMQContext &context, ZMQSocketType in_socket_type) : socket_type(in_socket_type) {
ZMQSocket::ZMQSocket(ZMQSocketType in_socket_type) : socket_type(in_socket_type) {
socket = zmq_socket(context.GetContext(), static_cast<int>(socket_type));
if (socket == nullptr)
+3 -2
View File
@@ -42,14 +42,15 @@ public:
class ZMQSocket {
std::mutex m;
ZMQContext context;
ZMQSocketType socket_type;
void *socket;
void SetSocketOption(int32_t option_name, int32_t value);
public:
ZMQSocket(ZMQSocket &socket) = delete;
const ZMQSocket& operator=(ZMQSocket &socket) = delete;
ZMQSocket(ZMQContext &context, ZMQSocketType socket_type);
~ZMQSocket();
explicit ZMQSocket(ZMQSocketType socket_type);
~ZMQSocket();
void Connect(const std::string& addr);
void Disconnect(const std::string& addr);
void Bind(const std::string& addr);
+1 -3
View File
@@ -20,9 +20,7 @@ TARGET_LINK_LIBRARIES(CBORStream2FrameSerialize tinycbor)
ADD_LIBRARY(ImagePusher STATIC
ImagePusher.cpp ImagePusher.h
TestImagePusher.cpp TestImagePusher.h
ZMQStream2PusherGroup.cpp ZMQStream2PusherGroup.h
ZMQStream2Pusher.cpp
ZMQStream2Pusher.h
ZMQStream2Pusher.cpp ZMQStream2Pusher.h
DumpCBORToFilePusher.cpp
DumpCBORToFilePusher.h)
+96 -45
View File
@@ -1,66 +1,117 @@
// Copyright (2019-2024) Paul Scherrer Institute
#include "ZMQStream2Pusher.h"
#include "CBORStream2Serializer.h"
ZMQStream2Pusher::ZMQStream2Pusher(ZMQContext &context, const std::string &addr, int32_t send_buffer_high_watermark,
int32_t send_buffer_size)
: socket(context, ZMQSocketType::Push) {
Bind(addr, send_buffer_high_watermark, send_buffer_size);
}
ZMQStream2Pusher::ZMQStream2Pusher(const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size)
: serialization_buffer(256*1024*1024),
serializer(serialization_buffer.data(), serialization_buffer.size()),
preview_counter(std::chrono::seconds(1)) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided");
ZMQStream2Pusher::ZMQStream2Pusher(const std::string &addr, int32_t send_buffer_high_watermark,
int32_t send_buffer_size)
: context(std::make_unique<ZMQContext>()),
socket(*context, ZMQSocketType::Push) {
Bind(addr, send_buffer_high_watermark, send_buffer_size);
}
void ZMQStream2Pusher::Bind(const std::string &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (send_buffer_size > 0)
socket.SendBufferSize(send_buffer_size);
if (send_buffer_high_watermark > 0)
socket.SendWaterMark(send_buffer_high_watermark);
socket.SendTimeout(std::chrono::seconds(5)); // 5 seconds should be more than enough to flush buffers and to still give fast response
socket.Bind(addr);
}
void ZMQStream2Pusher::StartDataCollection(StartMessage &message) {
size_t approx_size = 1024*1024;
for (const auto &x : message.pixel_mask)
approx_size += x.size;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceStart(message);
if (!socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true))
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Timeout on pushing start message on addr " + GetAddress());
for (const auto &a : addr) {
auto s = std::make_unique<ZMQSocket>(ZMQSocketType::Push);
if (send_buffer_size > 0)
s->SendBufferSize(send_buffer_size);
if (send_buffer_high_watermark > 0)
s->SendWaterMark(send_buffer_high_watermark);
s->SendTimeout(std::chrono::seconds(5)); // 5 seconds should be more than enough to flush buffers and to still give fast response
s->Bind(a);
socket.emplace_back(std::move(s));
}
}
bool ZMQStream2Pusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
return socket.Send(image_data, image_size, false);
if (preview_socket) {
if (preview_counter.GeneratePreview())
preview_socket->Send(image_data, image_size, false);
}
if (!socket.empty()) {
auto socket_number = (image_number / images_per_file) % socket.size();
return socket[socket_number]->Send(image_data, image_size, false);
} else
return false;
}
void ZMQStream2Pusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) {
socket.SendZeroCopy(image_data,image_size, z);
ZeroCopyReturnValue *z) {
if (preview_socket) {
if (preview_counter.GeneratePreview())
preview_socket->Send(image_data, image_size, false);
}
if (!socket.empty()) {
auto socket_number = (image_number / images_per_file) % socket.size();
socket[socket_number]->SendZeroCopy(image_data, image_size, z);
} else
z->release();
}
bool ZMQStream2Pusher::EndDataCollection(const EndMessage &message) {
std::vector<uint8_t> serialization_buffer(80 * 1024 * 1024);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
void ZMQStream2Pusher::StartDataCollection(StartMessage& message) {
if (message.images_per_file < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Images per file cannot be zero or negative");
images_per_file = message.images_per_file;
serializer.SerializeSequenceEnd(message);
return socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true); // Blocking
serializer.SerializeSequenceStart(message);
for (auto &s: socket) {
if (!s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true))
throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Timeout on pushing start message on addr "
+ s->GetEndpointName());
if (message.write_master_file) {
message.write_master_file = false;
serializer.SerializeSequenceStart(message);
}
}
if (preview_socket)
preview_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
}
bool ZMQStream2Pusher::SendCalibration(const CompressedImage &message) {
std::vector<uint8_t> serialization_buffer(80 * 1024 * 1024);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
if (socket.empty())
return false;
serializer.SerializeCalibration(message);
return socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true); // Blocking
return socket[0]->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
}
std::string ZMQStream2Pusher::GetAddress() {
return socket.GetEndpointName();
bool ZMQStream2Pusher::EndDataCollection(const EndMessage& message) {
serializer.SerializeSequenceEnd(message);
bool ret = true;
for (auto &s: socket) {
if (!s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true))
ret = false;
}
if (preview_socket)
preview_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
return ret;
}
std::vector<std::string> ZMQStream2Pusher::GetAddress() {
std::vector<std::string> ret;
for (auto &p: socket)
ret.push_back(p->GetEndpointName());
return ret;
}
ZMQStream2Pusher &ZMQStream2Pusher::PreviewSocket(const std::string &addr) {
preview_socket = std::make_unique<ZMQSocket>(ZMQSocketType::Pub);
preview_socket->Bind(addr);
return *this;
}
std::string ZMQStream2Pusher::GetPreviewAddress() {
if (preview_socket)
return preview_socket->GetEndpointName();
else
return "";
}
+24 -14
View File
@@ -3,30 +3,40 @@
#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
#define JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
#include <mutex>
#include "ImagePusher.h"
#include "../common/ZMQWrappers.h"
#include "../preview/PreviewCounter.h"
class ZMQStream2Pusher : public ImagePusher {
std::unique_ptr<ZMQContext> context;
ZMQSocket socket;
public:
ZMQStream2Pusher(ZMQContext& context,
const std::string& addr,
int32_t send_buffer_high_watermark = -1,
int32_t send_buffer_size = -1);
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
explicit ZMQStream2Pusher(const std::string& addr,
std::vector<std::unique_ptr<ZMQSocket>> socket;
std::unique_ptr<ZMQSocket> preview_socket;
PreviewCounter preview_counter;
int64_t images_per_file = 1;
public:
explicit ZMQStream2Pusher(const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1,
int32_t send_buffer_size = -1);
void Bind(const std::string& addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size);
ZMQStream2Pusher& PreviewSocket(const std::string& addr);
std::string GetPreviewAddress();
std::vector<std::string> GetAddress();
// Strictly serial, as order of these is important
void StartDataCollection(StartMessage& message) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override;
bool EndDataCollection(const EndMessage &message) override;
bool EndDataCollection(const EndMessage& message) override;
bool SendCalibration(const CompressedImage& message) override;
std::string GetAddress();
// Thread-safe
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
};
#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
-75
View File
@@ -1,75 +0,0 @@
// Copyright (2019-2024) Paul Scherrer Institute
#include "ZMQStream2PusherGroup.h"
#include "CBORStream2Serializer.h"
ZMQStream2PusherGroup::ZMQStream2PusherGroup(ZMQContext &zmq_context, const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"No writer ZMQ address provided");
for (const auto &a : addr)
pusher.emplace_back(std::make_unique<ZMQStream2Pusher>
(zmq_context, a, send_buffer_high_watermark, send_buffer_size));
}
ZMQStream2PusherGroup::ZMQStream2PusherGroup(const std::vector<std::string> &addr,
int32_t send_buffer_high_watermark, int32_t send_buffer_size) {
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"No writer ZMQ address provided");
for (const auto &a : addr)
pusher.emplace_back(std::make_unique<ZMQStream2Pusher>
(a, send_buffer_high_watermark, send_buffer_size));
}
bool ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
if (!pusher.empty()) {
auto socket_number = (image_number / images_per_file) % pusher.size();
return pusher[socket_number]->SendImage(image_data, image_size, image_number);
} else
return false;
}
void ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number,
ZeroCopyReturnValue *z) {
if (!pusher.empty()) {
auto socket_number = (image_number / images_per_file) % pusher.size();
pusher[socket_number]->SendImage(image_data, image_size, image_number, z);
}
}
void ZMQStream2PusherGroup::StartDataCollection(StartMessage& message) {
if (message.images_per_file < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Images per file cannot be zero or negative");
images_per_file = message.images_per_file;
for (auto &p: pusher) {
p->StartDataCollection(message);
message.write_master_file = false;
}
}
bool ZMQStream2PusherGroup::SendCalibration(const CompressedImage &message) {
if (pusher.empty())
return false;
return pusher[0]->SendCalibration(message);
}
bool ZMQStream2PusherGroup::EndDataCollection(const EndMessage& message) {
bool ret = true;
for (auto &p: pusher) {
if (!p->EndDataCollection(message))
ret = false;
}
return ret;
}
std::vector<std::string> ZMQStream2PusherGroup::GetAddress() {
std::vector<std::string> ret;
for (auto &p: pusher)
ret.push_back(p->GetAddress());
return ret;
}
-29
View File
@@ -1,29 +0,0 @@
// Copyright (2019-2024) Paul Scherrer Institute
#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H
#define JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H
#include "ImagePusher.h"
#include "ZMQStream2Pusher.h"
#include "../common/ZMQWrappers.h"
class ZMQStream2PusherGroup : public ImagePusher {
std::vector<std::unique_ptr<ZMQStream2Pusher>> pusher;
int64_t images_per_file = 1;
public:
ZMQStream2PusherGroup(ZMQContext &context, const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
// High performance implementation, where each socket has dedicated ZMQ context
explicit ZMQStream2PusherGroup(const std::vector<std::string>& addr,
int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1);
void StartDataCollection(StartMessage& message) override;
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
bool EndDataCollection(const EndMessage& message) override;
bool SendCalibration(const CompressedImage& message) override;
std::vector<std::string> GetAddress();
};
#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H
+13 -18
View File
@@ -28,7 +28,7 @@ class PreviewImage extends Component<MyProps, MyState> {
show_indexed: false,
resolution_ring: 0.5
},
s_url: "",
s_url: null,
update: true,
connection_error: true
}
@@ -124,9 +124,12 @@ class PreviewImage extends Component<MyProps, MyState> {
clearInterval(this.interval);
}
preview() {
return <div><br/>
<Stack spacing={2} direction="row" sx={{ mb: 1 }} alignItems="center">
render() {
return <Paper sx={{height: 1050, width: 850, m: 2}}
component={Stack}
direction="column">
<br/>
<Stack spacing={2} direction="row" sx={{mb: 1}} alignItems="center">
&nbsp;&nbsp;<strong>Preview image</strong>
<Switch disabled={this.state.connection_error} checked={this.state.update}
onChange={this.updateToggle} name="Update"/>
@@ -140,13 +143,13 @@ class PreviewImage extends Component<MyProps, MyState> {
<Switch disabled={this.state.connection_error} checked={this.state.settings.show_indexed}
onChange={this.showIndexedToggle} name="Show ROI"/>
Show only indexed images&nbsp;&nbsp;&nbsp;
<Box sx={{ width: 200 }}>
<Box sx={{width: 200}}>
<Slider disabled={this.state.connection_error}
value={Number(this.state.settings.saturation)} min={1} max={80}
onChange={this.setSaturation} valueLabelDisplay="auto"/> <br/>Saturation value
</Box>
&nbsp;&nbsp;&nbsp;
<Box sx={{ width: 200 }}>
<Box sx={{width: 200}}>
<Slider disabled={this.state.connection_error}
value={(this.state.settings.resolution_ring === undefined) ? 0.5 : Number(this.state.settings.resolution_ring)}
min={0.5} max={5.0} step={0.1}
@@ -155,8 +158,7 @@ class PreviewImage extends Component<MyProps, MyState> {
</Stack>
<br/>
{
this.state.s_url !== null ?
{(!this.state.connection_error && (this.state.s_url !== null)) ?
<Stack
direction="row"
justifyContent="center"
@@ -164,20 +166,13 @@ class PreviewImage extends Component<MyProps, MyState> {
>
<TransformWrapper>
<TransformComponent>
<img src={this.state.s_url} alt="Live preview" style={{maxWidth: "100%", maxHeight: 900}}/>
<img src={this.state.s_url} alt="Live preview"
style={{maxWidth: "100%", maxHeight: 900}}/>
</TransformComponent>
</TransformWrapper>
</Stack>: <div/>
</Stack> : <div>Preview not available</div>
}
<br/>
</div>
}
render() {
return <Paper sx={{height: 1050, width: 850, m: 2}}
component={Stack}
direction="column">
{(!this.state.connection_error && (this.state.s_url !== null)) ? this.preview() : "Preview not available"}
</Paper>
}
}
+8 -4
View File
@@ -27,8 +27,12 @@ MXAnalyzer::MXAnalyzer(const DiffractionExperiment &in_experiment)
: experiment(in_experiment) {
auto uc = experiment.GetUnitCell();
if (uc) {
do_indexing = true;
indexer.Setup(uc.value());
try {
indexer = std::make_unique<IndexerWrapper>();
indexer->Setup(uc.value());
} catch (const std::exception &e) {
throw JFJochException(JFJochExceptionCategory::GPUCUDAError, e.what());
}
}
if (experiment.IsSpotFindingEnabled())
find_spots = true;
@@ -73,13 +77,13 @@ void MXAnalyzer::Process(DataMessage &message, const SpotFindingSettings& settin
for (const auto &spot: spots_out)
message.spots.push_back(spot);
if (do_indexing && settings.indexing) {
if (indexer && settings.indexing) {
std::vector<Coord> recip;
recip.reserve(spots_out.size());
for (const auto &i: spots_out)
recip.push_back(i.ReciprocalCoord(experiment));
auto indexer_result = indexer.Run(recip, settings.indexing_tolerance);
auto indexer_result = indexer->Run(recip, settings.indexing_tolerance);
if (!indexer_result.empty()) {
message.indexing_result = true;
+1 -2
View File
@@ -9,8 +9,7 @@
class MXAnalyzer {
const DiffractionExperiment &experiment;
IndexerWrapper indexer;
bool do_indexing = false;
std::unique_ptr<IndexerWrapper> indexer;
bool find_spots = false;
std::vector<DiffractionSpot> spots;
constexpr static const float spot_distance_threshold_pxl = 2.0f;
+11
View File
@@ -33,6 +33,7 @@ constexpr const static rgb gray = {.r = 0xbe, .g = 0xbe, .b = 0xbe};
PreviewImage::PreviewImage(const DiffractionExperiment &in_experiment) :
experiment(in_experiment),
initialized(false),
xpixel(experiment.GetXPixelsNum()),
ypixel(experiment.GetYPixelsNum()),
beam_x(experiment.GetBeamX_pxl()),
@@ -47,6 +48,7 @@ void PreviewImage::UpdateImage(const void *in_uncompressed_image,
const std::vector<SpotToSave> &in_spots) {
if (counter.GeneratePreview()) {
std::unique_lock<std::mutex> ul(m);
initialized = true;
memcpy(uncompressed_image.data(), in_uncompressed_image, xpixel * ypixel * pixel_depth_bytes);
spots = in_spots;
}
@@ -109,6 +111,10 @@ std::string PreviewImage::GenerateJPEG(const PreviewJPEGSettings &settings) cons
{
// JPEG compression is outside the critical loop protected by m
std::unique_lock<std::mutex> ul(m);
if (!initialized)
return {};
if (!pixel_is_signed) {
if (pixel_depth_bytes == 2)
v = GenerateRGB<uint16_t>((uint16_t *) uncompressed_image.data(), xpixel * ypixel,
@@ -141,6 +147,9 @@ std::string PreviewImage::GenerateJPEG(const PreviewJPEGSettings &settings) cons
std::string PreviewImage::GenerateTIFF() const {
std::unique_lock<std::mutex> ul(m);
if (!initialized)
return {};
std::string s = WriteTIFFToString(const_cast<uint8_t *>(uncompressed_image.data()),
xpixel, ypixel, pixel_depth_bytes, pixel_is_signed);
return s;
@@ -167,6 +176,8 @@ std::vector<uint16_t> GenerateDioptasPreview(const void* input, size_t xpixel, s
std::string PreviewImage::GenerateTIFFDioptas() const {
std::unique_lock<std::mutex> ul(m);
if (!initialized)
return {};
std::vector<uint16_t> vec;
if (pixel_is_signed) {
+1
View File
@@ -29,6 +29,7 @@ class PreviewImage {
mutable std::mutex m;
DiffractionExperiment experiment;
bool initialized;
const ROIMap roi_map;
std::vector<uint8_t> uncompressed_image;
std::vector<SpotToSave> spots;
+7 -5
View File
@@ -257,18 +257,20 @@ void JFJochReceiver::RetrievePedestal() {
void JFJochReceiver::FrameTransformationThread() {
std::unique_ptr<MXAnalyzer> analyzer;
try {
numa_policy.Bind();
analyzer = std::make_unique<MXAnalyzer>(experiment);
} catch (const JFJochException &e) {
frame_transformation_ready.count_down();
logger.Error("HW bind error {}", e.what());
logger.Error("Thread setup error {}", e.what());
Cancel(e);
return;
}
FrameTransformation transformation(experiment);
MXAnalyzer analyzer(experiment);
frame_transformation_ready.count_down();
uint16_t az_int_min_bin = std::floor(az_int_mapping.QToBin(experiment.GetLowQForBkgEstimate_recipA()));
@@ -313,7 +315,7 @@ void JFJochReceiver::FrameTransformationThread() {
adu_histogram_module[module_abs_number].Add(*output);
az_int_profile_image.Add(*output);
analyzer.ReadFromFPGA(output, local_spot_finding_settings, module_abs_number);
analyzer->ReadFromFPGA(output, local_spot_finding_settings, module_abs_number);
transformation.ProcessModule(output, d);
} else
@@ -332,7 +334,7 @@ void JFJochReceiver::FrameTransformationThread() {
continue;
}
analyzer.Process(message, local_spot_finding_settings);
analyzer->Process(message, local_spot_finding_settings);
message.receiver_free_send_buf = send_buf_ctrl.GetAvailBufLocations();
message.az_int_profile = az_int_profile_image.GetResult();
+1 -1
View File
@@ -2,7 +2,7 @@
#include "JFJochReceiverTest.h"
#include "JFJochReceiverService.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../frame_serialize/TestImagePusher.h"
#define STORAGE_CELL_FOR_TEST 11
+2 -4
View File
@@ -221,8 +221,6 @@ TEST_CASE("HDF5Writer", "[HDF5][Full]") {
TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") {
{
ZMQContext c;
RegisterHDF5Filter();
DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36));
std::vector<SpotToSave> spots;
@@ -233,10 +231,10 @@ TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") {
x.FillMessage(start_message);
HDF5Writer file_set(start_message);
file_set.SetupSocket(c, "ipc://#1");
file_set.SetupSocket("ipc://#1");
std::vector<uint16_t> image(x.GetPixelsNum());
ZMQSocket s(c, ZMQSocketType::Sub);
ZMQSocket s(ZMQSocketType::Sub);
s.Connect("ipc://#1");
s.SubscribeAll();
s.ReceiveTimeout(std::chrono::seconds(5));
+7 -15
View File
@@ -52,13 +52,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]
aq_devices.Add(std::move(test));
ZMQContext context;
ZMQStream2Pusher pusher(context, "ipc://*");
StreamWriter writer(context, logger, pusher.GetAddress());
ZMQStream2Pusher pusher({"ipc://*"});
StreamWriter writer(logger, pusher.GetAddress()[0]);
auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer);
context.NumThreads(4);
JFJochReceiverService service(aq_devices, logger, pusher);
service.NumThreads(nthreads);
@@ -127,11 +124,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_min_pix_2", "[JFJoc
test->SetInternalGeneratorFrame((uint16_t *) image_raw_geom.data() + m * RAW_MODULE_SIZE, m);
aq_devices.Add(std::move(test));
ZMQContext context;
context.NumThreads(4);
ZMQStream2Pusher pusher(context, "ipc://*");
StreamWriter writer(context, logger, pusher.GetAddress());
ZMQStream2Pusher pusher({"ipc://*"});
StreamWriter writer(logger, pusher.GetAddress()[0]);
auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer);
JFJochReceiverService service(aq_devices, logger, pusher);
@@ -241,11 +235,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_ROI", "[JFJochReceiver]") {
aq_devices.Add(std::move(test));
ZMQContext context;
context.NumThreads(4);
ZMQStream2Pusher pusher(context, "ipc://*");
StreamWriter writer(context, logger, pusher.GetAddress());
ZMQStream2Pusher pusher({"ipc://*"});
StreamWriter writer(logger, pusher.GetAddress()[0]);
auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer);
JFJochReceiverService service(aq_devices, logger, pusher);
@@ -269,6 +260,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_ROI", "[JFJochReceiver]") {
REQUIRE(plot.size() == 2);
CHECK(plot[0].title == "roi0");
REQUIRE(!plot[0].x.empty());
CHECK(plot[0].x[0] == 0);
CHECK(plot[0].y[0] == roi_value);
CHECK(plot[1].title == "roi1");
+5 -1
View File
@@ -58,7 +58,6 @@ TEST_CASE("PreviewImage_GenerateJPEG","[JPEG]") {
{.x = 1200, .y = 500, .indexed = true}
};
PreviewImage image(experiment);
image.UpdateImage(image_conv_2.data(), spots);
PreviewJPEGSettings preview_settings{
.saturation_value = 5,
@@ -66,6 +65,11 @@ TEST_CASE("PreviewImage_GenerateJPEG","[JPEG]") {
.show_spots = true
};
REQUIRE(image.GenerateJPEG(preview_settings).empty());
image.UpdateImage(image_conv_2.data(), spots);
std::string s;
REQUIRE_NOTHROW(s = image.GenerateJPEG(preview_settings));
std::ofstream f("lyso_diff.jpeg", std::ios::binary);
+3 -5
View File
@@ -4,15 +4,13 @@
#include <filesystem>
#include "../writer/StreamWriter.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#include "../receiver/JFJochReceiverService.h"
TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
RegisterHDF5Filter();
Logger logger("test");
ZMQContext context;
std::string zmq_addr = "ipc://*";
DiffractionExperiment x(DetectorGeometry(2));
x.FilePrefix("subdir/JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5)
@@ -23,7 +21,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
for (int i = 0; i < x.GetDataStreamsNum(); i++)
aq_devices.AddHLSDevice(64);
ZMQStream2PusherGroup pusher (context, {zmq_addr});
ZMQStream2Pusher pusher ({"ipc://*"});
JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher);
JFJochReceiverOutput receiver_output;
@@ -32,7 +30,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") {
REQUIRE(x.GetImageNum() == 5);
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == 1);
REQUIRE_NOTHROW(writer = std::make_unique<StreamWriter>(context, logger, pusher_addr[0]));
REQUIRE_NOTHROW(writer = std::make_unique<StreamWriter>(logger, pusher_addr[0]));
CHECK (writer->GetStatistics().state == StreamWriterState::Idle);
REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr));
+86 -15
View File
@@ -3,7 +3,7 @@
#include <random>
#include <catch2/catch_all.hpp>
#include "../writer/ZMQImagePuller.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
void test_puller(ZMQImagePuller *puller,
const DiffractionExperiment& x,
@@ -53,7 +53,6 @@ void test_puller(ZMQImagePuller *puller,
TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
const size_t nframes = 256;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
@@ -71,12 +70,10 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
std::string zmq_addr = "ipc://*";
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
ZMQImagePuller puller(context);
ZMQStream2PusherGroup pusher(context, {zmq_addr});
ZMQImagePuller puller;
ZMQStream2Pusher pusher({"ipc://*"});
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
@@ -119,11 +116,87 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
REQUIRE(diff_content[0] == 0);
}
TEST_CASE("ZMQImageCommTest_1Writer_Preview","[ZeroMQ]") {
const size_t nframes = 1;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<float> empty_rad_int_profile;
REQUIRE(x.GetImageNum() == nframes);
std::mt19937 g1(1387);
std::uniform_int_distribution<uint16_t> dist;
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
// Puller needs to be declared first, but both objects need to exist till communication finished
ZMQImagePuller puller;
ZMQStream2Pusher pusher({"ipc://*"});
REQUIRE(pusher.GetPreviewAddress().empty());
pusher.PreviewSocket("ipc://*");
REQUIRE(!pusher.GetPreviewAddress().empty());
ZMQSocket preview_sub_socket(ZMQSocketType::Sub);
preview_sub_socket.Connect(pusher.GetPreviewAddress());
preview_sub_socket.Subscribe("");
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
auto pusher_addr = pusher.GetAddress();
puller.Connect(pusher_addr[0]);
std::thread sender_thread = std::thread([&] {
std::vector<uint8_t> serialization_buffer(16*1024*1024);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
StartMessage message {
.images_per_file = 16,
.write_master_file = true
};
EndMessage end_message{};
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
serializer.SerializeImage(data_message);
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
}
pusher.EndDataCollection(end_message);
});
std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0,
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content),
std::ref(nimages));
sender_thread.join();
puller_thread.join();
puller.Disconnect();
REQUIRE(nimages[0] == nframes);
REQUIRE(diff_size[0] == 0);
REQUIRE(diff_content[0] == 0);
ZMQMessage msg;
REQUIRE(preview_sub_socket.Receive(msg));
REQUIRE(preview_sub_socket.Receive(msg));
REQUIRE(preview_sub_socket.Receive(msg));
}
TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
const size_t nframes = 256;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
@@ -148,7 +221,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("ipc://*");
ZMQStream2PusherGroup pusher(context, zmq_addr);
ZMQStream2Pusher pusher(zmq_addr);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
@@ -156,7 +229,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == 2);
for (int i = 0; i < npullers; i++) {
puller.push_back(std::make_unique<ZMQImagePuller>(context));
puller.push_back(std::make_unique<ZMQImagePuller>());
puller[i]->Connect(pusher_addr[i]);
}
@@ -213,7 +286,6 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
const size_t nframes = 255;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
@@ -238,14 +310,14 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("ipc://*");
ZMQStream2PusherGroup pusher(context, zmq_addr);
ZMQStream2Pusher pusher(zmq_addr);
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == npullers);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
std::vector<std::unique_ptr<ZMQImagePuller> > puller;
for (int i = 0; i < npullers; i++) {
puller.push_back(std::make_unique<ZMQImagePuller>(context));
puller.push_back(std::make_unique<ZMQImagePuller>());
puller[i]->Connect(pusher_addr[i]);
}
@@ -310,8 +382,7 @@ TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") {
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
ZMQContext context;
ZMQImagePuller puller(context);
ZMQImagePuller puller;
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
std::vector<uint16_t> image1(x.GetPixelsNum());
@@ -325,7 +396,7 @@ TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") {
}
TEST_CASE("ZMQImageCommTest_NoWriter","[ZeroMQ]") {
ZMQStream2PusherGroup pusher({"ipc://*"});
ZMQStream2Pusher pusher({"ipc://*"});
StartMessage msg;
REQUIRE_THROWS(pusher.StartDataCollection(msg));
+2 -2
View File
@@ -6,7 +6,7 @@
#include "../common/Logger.h"
#include "../receiver/FrameTransformation.h"
#include "../common/RawToConvertedGeometry.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
#include "../frame_serialize/ZMQStream2Pusher.h"
#define BASE_TCP_PORT 8000
@@ -54,7 +54,7 @@ int main(int argc, char **argv) {
for (int i = 0; i < nsockets; i++)
zmq_addr.emplace_back("tcp://0.0.0.0:" + std::to_string(BASE_TCP_PORT + i));
ZMQStream2PusherGroup pusher(context, zmq_addr);
ZMQStream2Pusher pusher(zmq_addr);
FrameTransformation transformation(x);
+2 -2
View File
@@ -74,8 +74,8 @@ void HDF5Writer::AddStats(const std::optional<HDF5DataFileStatistics>& s) {
}
}
void HDF5Writer::SetupSocket(ZMQContext &c, const std::string &addr) {
socket = std::make_unique<ZMQSocket>(c, ZMQSocketType::Pub);
void HDF5Writer::SetupSocket(const std::string &addr) {
socket = std::make_unique<ZMQSocket>(ZMQSocketType::Pub);
socket->Bind(addr);
}
+1 -1
View File
@@ -25,7 +25,7 @@ public:
explicit HDF5Writer(const StartMessage &request);
void Write(const DataMessage& msg);
std::vector<HDF5DataFileStatistics> Finalize();
void SetupSocket(ZMQContext &c, const std::string &addr);
void SetupSocket(const std::string &addr);
std::optional<std::string> GetZMQAddr();
};
+3 -5
View File
@@ -6,13 +6,11 @@
#include "HDF5NXmx.h"
#include "MakeDirectory.h"
StreamWriter::StreamWriter(ZMQContext &in_context,
Logger &in_logger,
StreamWriter::StreamWriter(Logger &in_logger,
const std::string &zmq_addr,
const std::string &repub_address,
const std::string &in_file_done_address)
: zmq_context(in_context),
image_puller(in_context, repub_address),
: image_puller(repub_address),
logger(in_logger),
file_done_address(in_file_done_address) {
image_puller.Connect(zmq_addr);
@@ -46,7 +44,7 @@ void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
HDF5Writer writer(*image_puller_output.cbor->start_message);
if (!file_done_address.empty())
writer.SetupSocket(zmq_context, file_done_address);
writer.SetupSocket(file_done_address);
std::unique_ptr<NXmx> master_file;
if (!image_puller_output.cbor->start_message->write_master_file || image_puller_output.cbor->start_message->write_master_file.value())
+1 -3
View File
@@ -24,7 +24,6 @@ struct StreamWriterOutput {
};
class StreamWriter {
ZMQContext &zmq_context;
std::string file_done_address;
StreamWriterState state = StreamWriterState::Idle;
@@ -42,8 +41,7 @@ class StreamWriter {
void CollectImages(std::vector<HDF5DataFileStatistics> &v);
bool WaitForImage();
public:
StreamWriter(ZMQContext& context,
Logger &logger,
StreamWriter(Logger &logger,
const std::string& zmq_addr,
const std::string& repub_address = "",
const std::string& file_done_address = "");
+3 -3
View File
@@ -2,13 +2,13 @@
#include "ZMQImagePuller.h"
ZMQImagePuller::ZMQImagePuller(ZMQContext &context, const std::string &repub_address) :
socket (context, ZMQSocketType::Pull) {
ZMQImagePuller::ZMQImagePuller(const std::string &repub_address) :
socket (ZMQSocketType::Pull) {
socket.ReceiveWaterMark(ReceiverWaterMark);
socket.ReceiveTimeout(ReceiveTimeout);
if (!repub_address.empty()) {
repub_socket = std::make_unique<ZMQSocket>(context, ZMQSocketType::Push);
repub_socket = std::make_unique<ZMQSocket>(ZMQSocketType::Push);
repub_socket->SendWaterMark(100);
repub_socket->SendTimeout(std::chrono::milliseconds(100));
repub_socket->Bind(repub_address);
+1 -1
View File
@@ -42,7 +42,7 @@ class ZMQImagePuller {
void RepubThread();
Logger logger{"ZMQImagePuller"};
public:
explicit ZMQImagePuller(ZMQContext &context, const std::string &repub_address = "");
explicit ZMQImagePuller(const std::string &repub_address = "");
~ZMQImagePuller();
void Connect(const std::string &in_address);
void Disconnect();
+1 -1
View File
@@ -117,7 +117,7 @@ int main(int argc, char **argv) {
ZMQContext context;
Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port));
writer = new StreamWriter(context, logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr);
writer = new StreamWriter(logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr);
httpEndpoint = new Pistache::Http::Endpoint(addr);
auto router = std::make_shared<Pistache::Rest::Router>();