ZMQPreviewPublisher: Stream CBOR data

This commit is contained in:
2023-11-08 19:19:32 +01:00
parent cc34a9801e
commit 56476e3e5f
7 changed files with 157 additions and 279 deletions
+6 -39
View File
@@ -9,56 +9,23 @@ ZMQPreviewPublisher::ZMQPreviewPublisher(ZMQContext& context, const std::string&
socket.Bind(addr);
}
void ZMQPreviewPublisher::Start(const DiffractionExperiment &experiment, const JFCalibration &calibration) {
void ZMQPreviewPublisher::StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t preview_stride) {
{
std::unique_lock<std::mutex> ul(m);
stride = experiment.GetPreviewStride();
stride = preview_stride;
current_part = -1;
}
auto mask = calibration.CalculateNexusMask(experiment);
JFJochProtoBuf::PreviewFrame frame;
frame.set_image_number(-1);
frame.set_width(experiment.GetXPixelsNum());
frame.set_height(experiment.GetYPixelsNum());
frame.set_pixel_depth(4);
frame.set_data(mask.data(), experiment.GetPixelsNum() * sizeof(uint32_t));
socket.Send(grpcToJson(frame));
socket.Send(image_data, image_size);
}
void ZMQPreviewPublisher::Stop(const DiffractionExperiment& experiment) {}
void ZMQPreviewPublisher::Publish(const DiffractionExperiment& experiment, const void* image_data, const DataMessage &message) {
void ZMQPreviewPublisher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
{
std::unique_lock<std::mutex> ul(m);
int64_t part = message.number / stride;
int64_t part = image_number / stride;
if (current_part >= part)
return;
else
current_part = part;
}
JFJochProtoBuf::PreviewFrame frame;
frame.set_image_number(message.number);
frame.set_total_images(experiment.GetImageNum());
frame.set_wavelength_a(experiment.GetWavelength_A());
frame.set_beam_x_pxl(experiment.GetBeamX_pxl());
frame.set_beam_y_pxl(experiment.GetBeamY_pxl());
frame.set_saturation_value(experiment.GetOverflow());
frame.set_file_prefix(experiment.GetFilePrefix());
frame.set_detector_distance_mm(experiment.GetDetectorDistance_mm());
frame.set_width(experiment.GetXPixelsNum());
frame.set_height(experiment.GetYPixelsNum());
frame.set_pixel_depth(experiment.GetPixelDepth());
frame.set_data(image_data, experiment.GetPixelsNum() * experiment.GetPixelDepth());
for (const auto &s: message.spots) {
auto fr = frame.add_spots();
fr->set_x(s.x);
fr->set_y(s.y);
fr->set_indexed(s.indexed);
}
socket.Send(grpcToJson(frame));
socket.Send(image_data, image_size);
}
+2 -3
View File
@@ -19,9 +19,8 @@ class ZMQPreviewPublisher {
mutable std::mutex m;
public:
ZMQPreviewPublisher(ZMQContext& context, const std::string& addr);
void Start(const DiffractionExperiment& experiment, const JFCalibration &calibration);
void Publish(const DiffractionExperiment& experiment, const void* image_data, const DataMessage &message);
void Stop(const DiffractionExperiment& experiment);
void StartDataCollection(const uint8_t *image_data, size_t image_size, int64_t preview_stride);
void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number);
};
-23
View File
@@ -405,29 +405,6 @@ message DataProcessingSettings {
bool preview_indexed_only = 10;
}
message PreviewFrameSpot {
float x = 1;
float y = 2;
bool indexed = 3;
}
message PreviewFrame {
int64 image_number = 1;
int64 total_images = 2;
float wavelength_A = 3;
float beam_x_pxl = 4;
float beam_y_pxl = 5;
float detector_distance_mm = 6;
int64 saturation_value = 7;
string file_prefix = 8;
int64 width = 9;
int64 height = 10;
int64 pixel_depth = 11;
reserved 12;
bytes data = 13;
repeated PreviewFrameSpot spots = 14;
}
// Broker
message ModuleStatistics {
+39 -43
View File
File diff suppressed because one or more lines are too long
+71 -73
View File
@@ -72,9 +72,6 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
logger.Info("NUMA policy: {}", numa_policy.GetName());
if (experiment.GetDetectorMode() == DetectorMode::Conversion) {
if (preview_publisher != nullptr)
preview_publisher->Start(experiment, calib.value());
rad_int_mapping = std::make_unique<RadialIntegrationMapping>(experiment);
rad_int_profile = std::make_unique<RadialIntegrationProfile>(*rad_int_mapping, experiment);
rad_int_corr = CalcRadIntCorr(experiment);
@@ -136,74 +133,76 @@ JFJochReceiver::JFJochReceiver(const JFJochProtoBuf::ReceiverInput &settings,
if (experiment.GetImageNum() > 0) {
logger.Info("Data file count {}", experiment.GetDataFileCount());
if (push_images_to_writer) {
StartMessage message{};
experiment.FillMessage(message);
message.arm_date = time_UTC(std::chrono::system_clock::now());
StartMessage message{};
experiment.FillMessage(message);
message.arm_date = time_UTC(std::chrono::system_clock::now());
JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4);
std::vector<uint8_t> pixel_mask;
std::vector<std::vector<uint8_t> > pedestal;
JFJochBitShuffleCompressor compressor(CompressionAlgorithm::BSHUF_LZ4);
std::vector<uint8_t> pixel_mask;
std::vector<std::vector<uint8_t> > pedestal;
if (calib) {
size_t xpixel = experiment.GetXPixelsNum();
size_t ypixel = experiment.GetYPixelsNum();
if (calib) {
size_t xpixel = experiment.GetXPixelsNum();
size_t ypixel = experiment.GetYPixelsNum();
pixel_mask = compressor.Compress(calib->CalculateNexusMask(experiment, 0));
message.AddPixelMask(CBORImage{
.data = pixel_mask.data(),
.size = pixel_mask.size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 4,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = "sc0"
});
if (experiment.GetSaveCalibration()) {
for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) {
for (int gain = 0; gain < 3; gain++) {
auto tmp = compressor.Compress(calib->GetPedestal(gain, sc));
pedestal.emplace_back(tmp);
std::string channel = "pedestal_G" + std::to_string(gain);
pixel_mask = compressor.Compress(calib->CalculateNexusMask(experiment, 0));
message.AddPixelMask(CBORImage{
.data = pixel_mask.data(),
.size = pixel_mask.size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 4,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = "sc0"
});
if (experiment.GetSaveCalibration()) {
for (int sc = 0; sc < experiment.GetStorageCellNumber(); sc++) {
for (int gain = 0; gain < 3; gain++) {
auto tmp = compressor.Compress(calib->GetPedestal(gain, sc));
pedestal.emplace_back(tmp);
std::string channel = "pedestal_G" + std::to_string(gain);
if (experiment.GetStorageCellNumber() > 1)
channel += "_sc" + std::to_string(sc);
if (experiment.GetStorageCellNumber() > 1)
channel += "_sc" + std::to_string(sc);
CBORImage image{
.data = pedestal.at(pedestal.size() - 1).data(),
.size = pedestal.at(pedestal.size() - 1).size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 2,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = channel
};
CBORImage image{
.data = pedestal.at(pedestal.size() - 1).data(),
.size = pedestal.at(pedestal.size() - 1).size(),
.xpixel = (size_t) xpixel,
.ypixel = (size_t) ypixel,
.pixel_depth_bytes = 2,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::BSHUF_LZ4,
.channel = channel
};
message.AddCalibration(image);
}
message.AddCalibration(image);
}
}
}
}
if (rad_int_mapping) {
message.rad_int_bin_number = rad_int_mapping->GetBinNumber();
message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ();
message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr();
} else
message.rad_int_bin_number = 0;
if (rad_int_mapping) {
message.rad_int_bin_number = rad_int_mapping->GetBinNumber();
message.rad_int_bin_to_q = rad_int_mapping->GetBinToQ();
message.rad_int_solid_angle_corr = rad_int_mapping->GetSolidAngleCorr();
} else
message.rad_int_bin_number = 0;
std::vector<uint8_t> serialization_buffer(message.approx_size);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
std::vector<uint8_t> serialization_buffer(message.approx_size);
JFJochFrameSerializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceStart(message);
serializer.SerializeSequenceStart(message);
if (preview_publisher != nullptr)
preview_publisher->StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(),
experiment.GetPreviewStride());
if (push_images_to_writer)
image_pusher.StartDataCollection(serialization_buffer.data(), serializer.GetBufferSize(),
experiment.GetDataFileCount());
}
for (int i = 0; i < experiment.GetImageNum(); i++)
images_to_go.Put(i);
@@ -465,26 +464,28 @@ void JFJochReceiver::FrameTransformationThread() {
*rad_int_profile_per_file[image_number % experiment.GetDataFileCount()] += *rad_int_profile_image;
}
message.receiver_available_send_buffers = GetAvailableSendBuffers();
auto send_buffer_handle = send_buffer_avail.GetBlocking();
auto ptr = send_buffer + send_buffer_size * send_buffer_handle;
JFJochFrameSerializer serializer(ptr, send_buffer_size);
PrepareCBORImage(message, experiment, nullptr, 0);
serializer.SerializeImage(message);
if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Not enough memory to save image");
size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset());
serializer.AppendImage(image_size);
if (preview_publisher && (!local_data_processing_settings.preview_indexed_only() || indexed))
preview_publisher->Publish(experiment, transformation.GetPreviewImage(), message);
preview_publisher->SendImage(ptr, serializer.GetBufferSize(), image_number);
if (push_images_to_writer) {
message.receiver_available_send_buffers = GetAvailableSendBuffers();
auto send_buffer_handle = send_buffer_avail.GetBlocking();
auto ptr = send_buffer + send_buffer_size * send_buffer_handle;
JFJochFrameSerializer serializer(ptr, send_buffer_size);
PrepareCBORImage(message, experiment, nullptr, 0);
serializer.SerializeImage(message);
if (serializer.GetRemainingBuffer() < experiment.GetMaxCompressedSize())
throw JFJochException(JFJochExceptionCategory::ArrayOutOfBounds,
"Not enough memory to save image");
size_t image_size = transformation.SaveCompressedImage(ptr + serializer.GetImageAppendOffset());
serializer.AppendImage(image_size);
image_pusher.SendImage(ptr, serializer.GetBufferSize(), image_number,
&send_buffer_zero_copy_ret_val[send_buffer_handle]);
compressed_size += image_size;
}
} else
send_buffer_avail.Put(send_buffer_handle);
UpdateMaxImage(image_number);
images_sent++;
@@ -609,9 +610,6 @@ void JFJochReceiver::FinalizeMeasurement() {
logger.Info("Disconnected from writers");
}
if (preview_publisher != nullptr)
preview_publisher->Stop(experiment);
for (int d = 0; d < ndatastreams; d++)
acquisition_device[d]->Cancel();
+21 -47
View File
@@ -648,36 +648,24 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
std::string s;
JFJochFrameDeserializer deserializer;
// Pixel mask
REQUIRE(rcv_preview_socket.Receive(s, false) > 0);
JFJochProtoBuf::PreviewFrame frame;
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
// Check header
REQUIRE(frame.image_number() == -1);
REQUIRE(frame.width() == state_machine.NotThreadSafe_Experiment().GetXPixelsNum());
REQUIRE(frame.height() == state_machine.NotThreadSafe_Experiment().GetYPixelsNum());
REQUIRE(frame.pixel_depth() == 4);
deserializer.Process((uint8_t *) s.data(), s.size());
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START);
auto start_msg = deserializer.GetStartMessage();
REQUIRE(start_msg.image_size_x == state_machine.NotThreadSafe_Experiment().GetXPixelsNum());
REQUIRE(start_msg.image_size_y == state_machine.NotThreadSafe_Experiment().GetYPixelsNum());
// First frame
REQUIRE(rcv_preview_socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
deserializer.Process((uint8_t *) s.data(), s.size());
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
auto image_msg = deserializer.GetDataMessage();
// Check header
REQUIRE(frame.image_number() == 0);
REQUIRE(frame.beam_x_pxl() == Approx(setup.beam_x_pxl()));
REQUIRE(frame.beam_y_pxl() == Approx(setup.beam_y_pxl()));
REQUIRE(frame.width() == state_machine.NotThreadSafe_Experiment().GetXPixelsNum());
REQUIRE(frame.height() == state_machine.NotThreadSafe_Experiment().GetYPixelsNum());
REQUIRE(frame.pixel_depth() == 2);
// Check compressed image
size_t npixel = state_machine.NotThreadSafe_Experiment().GetPixelsNum();
std::vector<char> rcv_image(frame.data().size());
rcv_image = {frame.data().begin(), frame.data().end()};
REQUIRE(rcv_image.size() == state_machine.NotThreadSafe_Experiment().GetPixelsNum() * sizeof(int16_t));
REQUIRE(image_msg.number == 0);
// Check no more frames waiting
REQUIRE(rcv_preview_socket.Receive(s, false) == -1);
@@ -694,7 +682,6 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview", "[JFJochReceiver]") {
writer_server->Shutdown();
}
TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]") {
Logger logger("JFJochIntegrationTest_ZMQ_with_preview_no_writer");
@@ -767,37 +754,24 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_with_preview_no_writer", "[JFJochReceiver]"
logger.Info("Stopped measurement");
std::string s;
JFJochFrameDeserializer deserializer;
// Pixel mask
REQUIRE(rcv_preview_socket.Receive(s, false) > 0);
JFJochProtoBuf::PreviewFrame frame;
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
// Check header
REQUIRE(frame.image_number() == -1);
REQUIRE(frame.width() == state_machine.NotThreadSafe_Experiment().GetXPixelsNum());
REQUIRE(frame.height() == state_machine.NotThreadSafe_Experiment().GetYPixelsNum());
REQUIRE(frame.pixel_depth() == 4);
deserializer.Process((uint8_t *) s.data(), s.size());
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START);
auto start_msg = deserializer.GetStartMessage();
REQUIRE(start_msg.image_size_x == state_machine.NotThreadSafe_Experiment().GetXPixelsNum());
REQUIRE(start_msg.image_size_y == state_machine.NotThreadSafe_Experiment().GetYPixelsNum());
// First frame
REQUIRE(rcv_preview_socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
deserializer.Process((uint8_t *) s.data(), s.size());
REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE);
auto image_msg = deserializer.GetDataMessage();
// Check header
REQUIRE(frame.image_number() == 0);
REQUIRE(frame.beam_x_pxl() == Approx(setup.beam_x_pxl()));
REQUIRE(frame.beam_y_pxl() == Approx(setup.beam_y_pxl()));
REQUIRE(frame.width() == state_machine.NotThreadSafe_Experiment().GetXPixelsNum());
REQUIRE(frame.height() == state_machine.NotThreadSafe_Experiment().GetYPixelsNum());
REQUIRE(frame.pixel_depth() == 2);
// Check compressed image
size_t npixel = state_machine.NotThreadSafe_Experiment().GetPixelsNum();
std::vector<char> rcv_image(frame.data().size());
rcv_image = {frame.data().begin(), frame.data().end()};
REQUIRE(rcv_image.size() == state_machine.NotThreadSafe_Experiment().GetPixelsNum() * sizeof(int16_t));
REQUIRE(image_msg.number == 0);
// Check no more frames waiting
REQUIRE(rcv_preview_socket.Receive(s, false) == -1);
+18 -51
View File
@@ -14,48 +14,23 @@ TEST_CASE("ZMQPreviewPublisher","[ZMQ]") {
DiffractionExperiment experiment(DetectorGeometry(1, 1, 0, 0, false));
std::string z = "Msg 0";
JFCalibration calibration(experiment);
publisher.Start(experiment, calibration);
publisher.StartDataCollection((uint8_t *) z.data(), z.size(), experiment.GetPreviewStride());
std::vector<int16_t> image(experiment.GetPixelsNum());
// Predictable random number generator
std::mt19937 g1(19876);
std::uniform_int_distribution<int16_t> distribution(-200,25000);
for (auto &i: image)
i = distribution(g1);
std::vector<SpotToSave> spots;
spots.push_back(SpotToSave{.x = 7, .y = 8, .intensity = 34, .indexed = false});
spots.push_back(SpotToSave{.x = 37, .y = 48, .intensity = 123, .indexed = true});
DataMessage message{.number = 564, .spots = spots};
publisher.Publish(experiment, image.data(), message);
z = "Msg 1";
publisher.SendImage((uint8_t *) z.data(), z.size(), 0);
std::string s;
// Pixel mask
REQUIRE(socket.Receive(s, false) > 0);
JFJochProtoBuf::PreviewFrame frame;
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.pixel_depth() == 4);
REQUIRE(frame.image_number() == -1);
REQUIRE(s == "Msg 0");
// Frame
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.pixel_depth() == 2);
REQUIRE(frame.image_number() == 564);
std::vector<char> image_out = {frame.data().begin(), frame.data().end()};
REQUIRE(memcmp(image.data(), image_out.data(), experiment.GetPixelsNum() * experiment.GetPixelDepth()) == 0);
REQUIRE(frame.spots_size() == 2);
REQUIRE(frame.spots(0).x() == 7);
REQUIRE(!frame.spots(0).indexed());
REQUIRE(frame.spots(1).y() == 48);
REQUIRE(frame.spots(1).indexed());
REQUIRE(s == "Msg 1");
REQUIRE(socket.Receive(s, false) < 0);
}
@@ -73,39 +48,31 @@ TEST_CASE("ZMQPreviewPublisher_FrameNumbers","[ZMQ]") {
REQUIRE(experiment.GetPreviewStride() == 10);
JFCalibration calibration(experiment);
publisher.Start(experiment, calibration);
std::vector<int16_t> image(experiment.GetPixelsNum());
std::string z = "Start";
publisher.Publish(experiment, image.data(), DataMessage{.number = 0});
publisher.Publish(experiment, image.data(), DataMessage{.number = 9});
publisher.Publish(experiment, image.data(), DataMessage{.number = 12});
publisher.Publish(experiment, image.data(), DataMessage{.number = 10});
publisher.Publish(experiment, image.data(), DataMessage{.number = 25});
publisher.Publish(experiment, image.data(), DataMessage{.number = 20});
publisher.Publish(experiment, image.data(), DataMessage{.number = 18});
publisher.StartDataCollection((uint8_t *) z.data(), z.size(), experiment.GetPreviewStride());
for (const auto& i: {0, 9, 12, 10, 25, 20, 18}) {
z = "Msg " + std::to_string(i);
publisher.SendImage((uint8_t *) z.data(), z.size(), i);
}
std::string s;
JFJochProtoBuf::PreviewFrame frame;
// Pixel mask
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE(s == "Start");
// Frame
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.image_number() == 0);
std::vector<char> image_out = {frame.data().begin(), frame.data().end()};
REQUIRE(s == "Msg 0");
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.image_number() == 12);
REQUIRE(s == "Msg 12");
REQUIRE(socket.Receive(s, false) > 0);
REQUIRE_NOTHROW(frame = jsonToGrpc<JFJochProtoBuf::PreviewFrame>(s));
REQUIRE(frame.image_number() == 25);
REQUIRE(s == "Msg 25");
REQUIRE(socket.Receive(s, false) < 0);
}