v1.0.0-rc.40
This commit is contained in:
+138
-60
@@ -17,23 +17,16 @@ StreamWriter::StreamWriter(Logger &in_logger,
|
||||
run_number(0) {
|
||||
}
|
||||
|
||||
void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
|
||||
bool run = WaitForImage();
|
||||
void StreamWriter::ProcessStartMessage() {
|
||||
if (state == StreamWriterState::Finalized)
|
||||
return; // Should not happen (?)
|
||||
|
||||
while (run && !image_puller_output.cbor->start_message) {
|
||||
if (image_puller_output.cbor->msg_type == CBORImageType::IMAGE)
|
||||
logger.Warning("Missing meaningful image while waiting for START");
|
||||
run = WaitForImage();
|
||||
}
|
||||
if (state != StreamWriterState::Idle)
|
||||
FinalizeDataCollection();
|
||||
|
||||
if (!run)
|
||||
return;
|
||||
err = "";
|
||||
|
||||
logger.Info("Starting writing for dataset {} of {} images",
|
||||
image_puller_output.cbor->start_message->file_prefix,
|
||||
image_puller_output.cbor->start_message->number_of_images);
|
||||
state = StreamWriterState::Started;
|
||||
uint64_t max_image_number = 0;
|
||||
max_image_number = 0;
|
||||
|
||||
processed_images = 0;
|
||||
processed_image_size = 0;
|
||||
@@ -42,60 +35,139 @@ void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
|
||||
run_number = image_puller_output.cbor->start_message->run_number;
|
||||
run_name = image_puller_output.cbor->start_message->run_name;
|
||||
|
||||
FileWriter writer(*image_puller_output.cbor->start_message);
|
||||
|
||||
if (!file_done_address.empty())
|
||||
writer.SetupFinalizedFileSocket(file_done_address);
|
||||
|
||||
socket_number = 0;
|
||||
if (image_puller_output.cbor->start_message->socket_number)
|
||||
socket_number = image_puller_output.cbor->start_message->socket_number.value();
|
||||
auto writer_notification_zmq_addr = image_puller_output.cbor->start_message->writer_notification_zmq_addr;
|
||||
writer_notification_zmq_addr = image_puller_output.cbor->start_message->writer_notification_zmq_addr;
|
||||
|
||||
bool first_image = true;
|
||||
run = WaitForImage();
|
||||
while (run && image_puller_output.cbor->calibration) {
|
||||
try {
|
||||
file_writer = std::make_unique<FileWriter>(*image_puller_output.cbor->start_message);
|
||||
if (!file_done_address.empty())
|
||||
file_writer->SetupFinalizedFileSocket(file_done_address);
|
||||
logger.Info("Starting writing for dataset {} of {} images",
|
||||
image_puller_output.cbor->start_message->file_prefix,
|
||||
image_puller_output.cbor->start_message->number_of_images);
|
||||
state = StreamWriterState::Started;
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error writing start message - switching to error state");
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
}
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessCalibrationImage() {
|
||||
switch (state) {
|
||||
case StreamWriterState::Started:
|
||||
try {
|
||||
file_writer->WriteHDF5(*image_puller_output.cbor->calibration);
|
||||
} catch (const std::exception &e) {
|
||||
logger.Warning(e.what());
|
||||
logger.Warning("Error during writing calibration data - skipping");
|
||||
}
|
||||
break;
|
||||
case StreamWriterState::Receiving:
|
||||
logger.Warning("Unexpected calibration message");
|
||||
break;
|
||||
case StreamWriterState::Error:
|
||||
case StreamWriterState::Idle:
|
||||
case StreamWriterState::Finalized:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessDataImage() {
|
||||
switch (state) {
|
||||
case StreamWriterState::Idle:
|
||||
logger.Warning("Missing meaningful image while waiting for START");
|
||||
mute_data_msg_in_idle = true;
|
||||
break;
|
||||
case StreamWriterState::Started:
|
||||
start_time = std::chrono::system_clock::now();
|
||||
state = StreamWriterState::Receiving;
|
||||
// Follow through to receiving - no brake!
|
||||
case StreamWriterState::Receiving:
|
||||
try {
|
||||
file_writer->WriteHDF5(*image_puller_output.cbor->data_message);
|
||||
if (max_image_number < image_puller_output.cbor->data_message->number + 1)
|
||||
max_image_number = image_puller_output.cbor->data_message->number + 1;
|
||||
|
||||
processed_images++;
|
||||
processed_image_size += image_puller_output.cbor->data_message->image.GetCompressedSize();
|
||||
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Warning("Error writing image - switching to error state");
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
}
|
||||
break;
|
||||
case StreamWriterState::Error:
|
||||
case StreamWriterState::Finalized:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessEndMessage() {
|
||||
// Ignore end message when idle state!
|
||||
if (state == StreamWriterState::Idle || state == StreamWriterState::Finalized)
|
||||
return;
|
||||
|
||||
if (state != StreamWriterState::Error) {
|
||||
try {
|
||||
writer.WriteHDF5(*image_puller_output.cbor->calibration);
|
||||
if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0))
|
||||
image_puller_output.cbor->end_message->max_image_number = max_image_number;
|
||||
file_writer->WriteHDF5(*image_puller_output.cbor->end_message);
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error writing end message - switching to error state");
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
}
|
||||
run = WaitForImage();
|
||||
}
|
||||
FinalizeDataCollection();
|
||||
}
|
||||
|
||||
while (run && image_puller_output.cbor->data_message) {
|
||||
if (first_image) {
|
||||
state = StreamWriterState::Receiving;
|
||||
start_time = std::chrono::system_clock::now();
|
||||
first_image = false;
|
||||
void StreamWriter::FinalizeDataCollection() {
|
||||
end_time = std::chrono::system_clock::now();
|
||||
|
||||
if (file_writer && (state != StreamWriterState::Error)) {
|
||||
try {
|
||||
hdf5_data_file_statistics = file_writer->Finalize();
|
||||
} catch (JFJochException &e) {
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error finalizing writing - switching to error state");
|
||||
}
|
||||
} else {
|
||||
hdf5_data_file_statistics.clear();
|
||||
}
|
||||
file_writer.reset();
|
||||
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr);
|
||||
logger.Info("Data writing finished");
|
||||
state = StreamWriterState::Finalized;
|
||||
}
|
||||
|
||||
writer.WriteHDF5(*image_puller_output.cbor->data_message);
|
||||
if (max_image_number < image_puller_output.cbor->data_message->number + 1)
|
||||
max_image_number = image_puller_output.cbor->data_message->number + 1;
|
||||
void StreamWriter::CollectImages() {
|
||||
state = StreamWriterState::Idle;
|
||||
mute_data_msg_in_idle = false;
|
||||
|
||||
processed_images++;
|
||||
processed_image_size += image_puller_output.cbor->data_message->image.size;
|
||||
bool run = true;
|
||||
while (run && state != StreamWriterState::Finalized) {
|
||||
run = WaitForImage();
|
||||
|
||||
if (image_puller_output.cbor->start_message)
|
||||
ProcessStartMessage();
|
||||
else if (image_puller_output.cbor->calibration)
|
||||
ProcessCalibrationImage();
|
||||
else if (image_puller_output.cbor->data_message)
|
||||
ProcessDataImage();
|
||||
else if (image_puller_output.cbor->end_message)
|
||||
ProcessEndMessage();
|
||||
else
|
||||
logger.Warning("Unknown message type");
|
||||
}
|
||||
|
||||
bool data_collection_seq_ok = false; // Data collection finished with a proper end message
|
||||
|
||||
if (run && image_puller_output.cbor->end_message) {
|
||||
end_time = std::chrono::system_clock::now();
|
||||
|
||||
if ((image_puller_output.cbor->end_message->max_image_number == 0) && (max_image_number > 0))
|
||||
image_puller_output.cbor->end_message->max_image_number = max_image_number;
|
||||
|
||||
writer.WriteHDF5(*image_puller_output.cbor->end_message);
|
||||
|
||||
data_collection_seq_ok = true;
|
||||
}
|
||||
|
||||
v = writer.Finalize();
|
||||
state = data_collection_seq_ok ? StreamWriterState::Idle : StreamWriterState::Error;
|
||||
|
||||
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr, data_collection_seq_ok);
|
||||
}
|
||||
|
||||
void StreamWriter::Cancel() {
|
||||
@@ -104,15 +176,18 @@ void StreamWriter::Cancel() {
|
||||
}
|
||||
|
||||
StreamWriterOutput StreamWriter::Run() {
|
||||
StreamWriterOutput ret;
|
||||
hdf5_data_file_statistics.clear();
|
||||
try {
|
||||
CollectImages(ret.data_file_stats);
|
||||
} catch (JFJochException &e) {
|
||||
CollectImages();
|
||||
} catch (std::exception &e) {
|
||||
// Error during collecting images will skip to end data collection
|
||||
// End data collection will consume all images till the end
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Exception not properly handled by CollectImages()");
|
||||
}
|
||||
|
||||
StreamWriterOutput ret;
|
||||
ret.data_file_stats = hdf5_data_file_statistics;
|
||||
ret.stats = GetStatistics();
|
||||
logger.Info("Write task done. Images = {} Throughput = {:.0f} MB/s Frame rate = {:.0f} Hz max occupation of FIFO {}",
|
||||
ret.stats.processed_images, ret.stats.performance_MBs, ret.stats.performance_Hz,
|
||||
@@ -163,8 +238,7 @@ StreamWriterStatistics StreamWriter::GetStatistics() const {
|
||||
};
|
||||
}
|
||||
|
||||
void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_update_zmq_addr,
|
||||
bool ok) {
|
||||
void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_update_zmq_addr) {
|
||||
if (debug_skip_write_notification) {
|
||||
logger.Info("StreamWriter: Skipping notification");
|
||||
return;
|
||||
@@ -177,13 +251,16 @@ void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_up
|
||||
|
||||
auto stats = GetStatistics();
|
||||
j["socket_number"] = socket_number;
|
||||
j["ok"] = ok;
|
||||
j["processed_images"] = processed_images.load();
|
||||
j["socket_number"] = stats.socket_number;
|
||||
j["run_number"] = stats.run_number;
|
||||
j["run_name"] = stats.run_name;
|
||||
j["performance_MBs"] = stats.performance_MBs;
|
||||
|
||||
if (state == StreamWriterState::Error) {
|
||||
j["ok"] = false;
|
||||
j["error"] = err;
|
||||
} else
|
||||
j["ok"] = true;
|
||||
try {
|
||||
logger.Info("Sending notification to {}", detector_update_zmq_addr);
|
||||
ZMQSocket s(ZMQSocketType::Push);
|
||||
@@ -192,6 +269,7 @@ void StreamWriter::NotifyReceiverOnFinalizedWrite(const std::string &detector_up
|
||||
s.Send(j.dump());
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error sending notification to detector update socket");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user