ZMQPreviewPublisher: Add spots to preview message + allow for frame number management within the ZMQPreviewPublisher

This commit is contained in:
2023-05-18 22:36:43 +02:00
parent ff92984fcc
commit eaccdf67b7
8 changed files with 163 additions and 62 deletions

View File

@@ -11,6 +11,11 @@ ZMQPreviewPublisher::ZMQPreviewPublisher(ZMQContext& context, const std::string&
}
void ZMQPreviewPublisher::Start(const DiffractionExperiment &experiment, const JFCalibration &calibration) {
{
std::unique_lock<std::mutex> ul(m);
stride = experiment.GetPreviewStride();
current_part = -1;
}
auto mask = calibration.CalculateNexusMask(experiment);
JFJochProtoBuf::PreviewFrame frame;
frame.set_image_number(-1);
@@ -23,9 +28,19 @@ void ZMQPreviewPublisher::Start(const DiffractionExperiment &experiment, const J
void ZMQPreviewPublisher::Stop(const DiffractionExperiment& experiment) {}
void ZMQPreviewPublisher::Publish(const DiffractionExperiment& experiment, const int16_t* image_data, uint32_t image_number) {
void ZMQPreviewPublisher::Publish(const DiffractionExperiment& experiment, const int16_t* image_data, const DataMessage &message) {
{
std::unique_lock<std::mutex> ul(m);
int64_t part = message.number / stride;
if (current_part >= part)
return;
else
current_part = part;
}
JFJochProtoBuf::PreviewFrame frame;
frame.set_image_number(image_number);
frame.set_image_number(message.number);
frame.set_total_images(experiment.GetImageNum());
frame.set_wavelength_a(experiment.GetWavelength_A());
frame.set_beam_x_pxl(experiment.GetBeamX_pxl());
@@ -39,6 +54,13 @@ void ZMQPreviewPublisher::Publish(const DiffractionExperiment& experiment, const
frame.set_data(image_data, experiment.GetPixelsNum() * sizeof(int16_t));
for (const auto &s: message.spots) {
auto fr = frame.add_spots();
fr->set_x(s.x);
fr->set_y(s.y);
fr->set_indexed(s.indexed);
}
SetPreviewImage(frame);
socket.Send(grpcToJson(frame));
}

View File

@@ -9,17 +9,22 @@
#include "ZMQWrappers.h"
#include "DiffractionExperiment.h"
#include "../jungfrau/JFCalibration.h"
#include "../frame_serialize/ImageMessage.h"
class ZMQPreviewPublisher {
ZMQSocket socket;
int64_t stride;
int64_t current_part;
mutable std::mutex m;
mutable std::mutex frame_mutex;
JFJochProtoBuf::PreviewFrame saved_frame;
void SetPreviewImage(const JFJochProtoBuf::PreviewFrame &frame);
public:
ZMQPreviewPublisher(ZMQContext& context, const std::string& addr);
void Start(const DiffractionExperiment& experiment, const JFCalibration &calibration);
void Publish(const DiffractionExperiment& experiment, const int16_t* image_data, uint32_t image_number);
void Publish(const DiffractionExperiment& experiment, const int16_t* image_data, const DataMessage &message);
void Stop(const DiffractionExperiment& experiment);
JFJochProtoBuf::PreviewFrame GetPreviewImage() const;
};

View File

@@ -9,6 +9,7 @@
#include <map>
#include <vector>
#include "../compression/CompressionAlgorithmEnum.h"
#include "../common/SpotToSave.h"
struct CBORImage {
const uint8_t *data;

View File

@@ -395,6 +395,12 @@ message DataProcessingSettings {
float bkg_estimate_high_q = 9;
}
message PreviewFrameSpot {
float x = 1;
float y = 2;
bool indexed = 3;
}
message PreviewFrame {
int64 image_number = 1;
int64 total_images = 2;
@@ -409,6 +415,7 @@ message PreviewFrame {
int64 pixel_depth = 11;
reserved 12;
bytes data = 13;
repeated PreviewFrameSpot spots = 14;
}
// Broker

File diff suppressed because one or more lines are too long

View File

@@ -349,10 +349,12 @@ void JFJochReceiver::FrameTransformationThread() {
bool send_image = false; // We send image if at least one module was collected in full
if (GPUImageAnalysis::GPUPresent() && (spotfinder_stride > 0) && (image_number % spotfinder_stride == 0)) {
calculate_spots = true;
if (rad_int_mapping)
send_bkg_estimate = true;
if (GPUImageAnalysis::GPUPresent()) {
if (((spotfinder_stride > 0) && (image_number % spotfinder_stride == 0)) || send_preview) {
calculate_spots = true;
if (rad_int_mapping)
send_bkg_estimate = true;
}
}
if (experiment.GetSummation() >= threaded_summation_threshold) {
@@ -421,11 +423,6 @@ void JFJochReceiver::FrameTransformationThread() {
if (send_bkg_estimate)
spot_finder->RunRadialIntegration();
if (send_preview)
preview_publisher->Publish(experiment,
transformation.GetPreview16BitImage(),
image_number);
if (calculate_spots) {
spot_finder->GetSpotFinderResults(experiment, GetDataProcessingSettings(), spots);
for (const auto &spot: spots)
@@ -472,6 +469,11 @@ void JFJochReceiver::FrameTransformationThread() {
spot_finder->GetRadialIntegrationCount());
}
if (send_preview)
preview_publisher->Publish(experiment,
transformation.GetPreview16BitImage(),
message);
if (push_images_to_writer) {
message.receiver_available_send_buffers = GetAvailableSendBuffers();

View File

@@ -723,7 +723,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
JFJochWriterService writer(zmq_context, logger);
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
fpga_receiver.PreviewPublisher(&preview);
fpga_receiver.PreviewPublisher(&preview).NumThreads(1);
ZMQSocket rcv_preview_socket(zmq_context, ZMQSocketType::Sub);
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));
@@ -844,7 +844,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
JFJochWriterService writer(zmq_context, logger);
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
fpga_receiver.PreviewPublisher(&preview);
fpga_receiver.PreviewPublisher(&preview).NumThreads(1);
ZMQSocket rcv_preview_socket(zmq_context, ZMQSocketType::Sub);
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));
@@ -963,7 +963,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer_binning2x2", "[JFJoc
JFJochWriterService writer(zmq_context, logger);
ZMQPreviewPublisher preview(zmq_context, "inproc://#2");
fpga_receiver.PreviewPublisher(&preview);
fpga_receiver.PreviewPublisher(&preview).NumThreads(1);
ZMQSocket rcv_preview_socket(zmq_context, ZMQSocketType::Sub);
REQUIRE_NOTHROW(rcv_preview_socket.Connect("inproc://#2"));

View File

@@ -5,7 +5,7 @@
#include "../common/ZMQPreviewPublisher.h"
#include "../common/jsonToGrpc.h"
TEST_CASE("ZMQPreviewPublisher","[ZSTD]") {
TEST_CASE("ZMQPreviewPublisher","[ZMQ]") {
ZMQContext context;
ZMQPreviewPublisher publisher(context, "inproc://#5");
@@ -26,7 +26,12 @@ TEST_CASE("ZMQPreviewPublisher","[ZSTD]") {
for (auto &i: image)
i = distribution(g1);
publisher.Publish(experiment, image.data(), 546);
std::vector<SpotToSave> spots;
spots.push_back(SpotToSave{.x = 7, .y = 8, .intensity = 34, .indexed = false});
spots.push_back(SpotToSave{.x = 37, .y = 48, .intensity = 123, .indexed = true});
DataMessage message{.number = 564, .spots = spots};
publisher.Publish(experiment, image.data(), message);
std::string s;
@@ -43,14 +48,70 @@ TEST_CASE("ZMQPreviewPublisher","[ZSTD]") {
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.pixel_depth() == 2);
REQUIRE(frame.image_number() == 546);
REQUIRE(frame.image_number() == 564);
std::vector<char> image_out = {frame.data().begin(), frame.data().end()};
REQUIRE(memcmp(image.data(), image_out.data(), experiment.GetPixelsNum() * experiment.GetPixelDepth()) == 0);
REQUIRE(frame.spots_size() == 2);
REQUIRE(frame.spots(0).x() == 7);
REQUIRE(!frame.spots(0).indexed());
REQUIRE(frame.spots(1).y() == 48);
REQUIRE(frame.spots(1).indexed());
REQUIRE(socket.Receive(s, false) < 0);
}
TEST_CASE("ZMQPreviewPublisher_GetPreviewImage","[ZSTD]") {
TEST_CASE("ZMQPreviewPublisher_FrameNumbers","[ZMQ]") {
ZMQContext context;
ZMQPreviewPublisher publisher(context, "inproc://#5");
ZMQSocket socket(context, ZMQSocketType::Sub);
socket.Connect("inproc://#5");
socket.SubscribeAll();
DiffractionExperiment experiment(DetectorGeometry(1, 1, 0, 0, false));
experiment.PreviewPeriod(std::chrono::milliseconds(10)).ImageTimeUs(std::chrono::milliseconds(1));
REQUIRE(experiment.GetPreviewStride() == 10);
JFCalibration calibration(experiment);
publisher.Start(experiment, calibration);
std::vector<int16_t> image(experiment.GetPixelsNum());
publisher.Publish(experiment, image.data(), DataMessage{.number = 0});
publisher.Publish(experiment, image.data(), DataMessage{.number = 9});
publisher.Publish(experiment, image.data(), DataMessage{.number = 12});
publisher.Publish(experiment, image.data(), DataMessage{.number = 10});
publisher.Publish(experiment, image.data(), DataMessage{.number = 25});
publisher.Publish(experiment, image.data(), DataMessage{.number = 20});
publisher.Publish(experiment, image.data(), DataMessage{.number = 18});
std::string s;
JFJochProtoBuf::PreviewFrame frame;
// Pixel mask
REQUIRE(socket.Receive(s, false) > 0);
// Frame
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.image_number() == 0);
std::vector<char> image_out = {frame.data().begin(), frame.data().end()};
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.image_number() == 12);
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.image_number() == 25);
REQUIRE(socket.Receive(s, false) < 0);
}
TEST_CASE("ZMQPreviewPublisher_GetPreviewImage","") {
ZMQContext context;
ZMQPreviewPublisher publisher(context, "inproc://#5");
@@ -67,12 +128,13 @@ TEST_CASE("ZMQPreviewPublisher_GetPreviewImage","[ZSTD]") {
for (auto &i: image)
i = distribution(g1);
publisher.Publish(experiment, image.data(), 546);
DataMessage message{.number = 564};
publisher.Publish(experiment, image.data(), message);
JFJochProtoBuf::PreviewFrame frame = publisher.GetPreviewImage();
REQUIRE(frame.pixel_depth() == 2);
REQUIRE(frame.image_number() == 546);
REQUIRE(frame.image_number() == 564);
std::vector<char> image_out = {frame.data().begin(), frame.data().end()};
REQUIRE(memcmp(image.data(), image_out.data(), experiment.GetPixelsNum() * experiment.GetPixelDepth()) == 0);