v1.0.0-rc.129 (#36)
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 11m14s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 10m43s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 11m35s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 9m20s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 10m23s
Build Packages / Generate python client (push) Successful in 39s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 11m24s
Build Packages / Create release (push) Has been skipped
Build Packages / Build documentation (push) Successful in 1m0s
Build Packages / build:rpm (rocky8) (push) Successful in 10m35s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 10m35s
Build Packages / build:rpm (rocky9) (push) Successful in 11m17s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m9s
Build Packages / Unit tests (push) Failing after 1h18m57s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 11m14s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 10m43s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 11m35s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 9m20s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 10m23s
Build Packages / Generate python client (push) Successful in 39s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 11m24s
Build Packages / Create release (push) Has been skipped
Build Packages / Build documentation (push) Successful in 1m0s
Build Packages / build:rpm (rocky8) (push) Successful in 10m35s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 10m35s
Build Packages / build:rpm (rocky9) (push) Successful in 11m17s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m9s
Build Packages / Unit tests (push) Failing after 1h18m57s
This is an UNSTABLE release. The release has significant modifications and bug fixes, if things go wrong, it is better to revert to 1.0.0-rc.124. * jfjoch_broker: Significant improvements in TCP image socket, as a viable alternative for ZeroMQ sockets (only a single port on broker side, dynamically change number of writers, acknowledgments for written files) * jfjoch_broker: Delta phi is calculated also for still data in Bragg prediction * jfjoch_broker: Image pusher statistics are accessible via the REST interface * jfjoch_writer: Supports TCP image socket and for these auto-forking option Reviewed-on: #36 Co-authored-by: Filip Leonarski <filip.leonarski@psi.ch> Co-committed-by: Filip Leonarski <filip.leonarski@psi.ch>
This commit was merged in pull request #36.
This commit is contained in:
+52
-1
@@ -20,6 +20,27 @@ StreamWriter::StreamWriter(Logger &in_logger,
|
||||
max_image_number(0) {
|
||||
}
|
||||
|
||||
void StreamWriter::NotifyTcpAck(TCPFrameType ack_for, bool ok, bool fatal, TCPAckCode code, const std::string &error_text) {
|
||||
if (!image_puller.SupportsAck())
|
||||
return;
|
||||
|
||||
PullerAckMessage ack;
|
||||
ack.ack_for = ack_for;
|
||||
ack.ok = ok;
|
||||
ack.fatal = fatal;
|
||||
ack.error_code = code;
|
||||
ack.error_text = error_text;
|
||||
ack.run_number = run_number;
|
||||
ack.socket_number = static_cast<uint32_t>(socket_number);
|
||||
ack.processed_images = processed_images.load();
|
||||
|
||||
if (image_puller_output.cbor && image_puller_output.cbor->data_message)
|
||||
ack.image_number = image_puller_output.cbor->data_message->number;
|
||||
|
||||
if (!image_puller.SendAck(ack))
|
||||
logger.Warning("Failed to send TCP ACK");
|
||||
}
|
||||
|
||||
void StreamWriter::ProcessStartMessage() {
|
||||
if (state == StreamWriterState::Finalized)
|
||||
return; // Should not happen (?)
|
||||
@@ -28,6 +49,7 @@ void StreamWriter::ProcessStartMessage() {
|
||||
FinalizeDataCollection();
|
||||
|
||||
err = "";
|
||||
tcp_data_fatal_sent = false;
|
||||
|
||||
max_image_number = 0;
|
||||
|
||||
@@ -51,11 +73,13 @@ void StreamWriter::ProcessStartMessage() {
|
||||
image_puller_output.cbor->start_message->file_prefix,
|
||||
image_puller_output.cbor->start_message->number_of_images);
|
||||
state = StreamWriterState::Started;
|
||||
NotifyTcpAck(TCPFrameType::START, true, false, TCPAckCode::None);
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Error("Error writing start message - switching to error state");
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
NotifyTcpAck(TCPFrameType::START, false, true, TCPAckCode::StartFailed, err);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,6 +91,7 @@ void StreamWriter::ProcessCalibrationImage() {
|
||||
} catch (const std::exception &e) {
|
||||
logger.Warning(e.what());
|
||||
logger.Warning("Error during writing calibration data - skipping");
|
||||
NotifyTcpAck(TCPFrameType::CALIBRATION, false, false, TCPAckCode::DataWriteFailed, e.what());
|
||||
}
|
||||
break;
|
||||
case StreamWriterState::Receiving:
|
||||
@@ -103,14 +128,17 @@ void StreamWriter::ProcessDataImage() {
|
||||
processed_image_size += image_puller_output.cbor->data_message->image.GetCompressedSize();
|
||||
if (verbose)
|
||||
logger.Info("Written");
|
||||
NotifyTcpAck(TCPFrameType::DATA, true, false, TCPAckCode::None);
|
||||
} catch (const JFJochException &e) {
|
||||
logger.ErrorException(e);
|
||||
logger.Warning("Error writing image - switching to error state");
|
||||
state = StreamWriterState::Error;
|
||||
err = e.what();
|
||||
NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err);
|
||||
}
|
||||
break;
|
||||
case StreamWriterState::Error:
|
||||
// Error state => Wait till end only
|
||||
case StreamWriterState::Finalized:
|
||||
break;
|
||||
}
|
||||
@@ -136,7 +164,16 @@ void StreamWriter::ProcessEndMessage() {
|
||||
err = e.what();
|
||||
}
|
||||
}
|
||||
bool error_state = (state == StreamWriterState::Error);
|
||||
|
||||
FinalizeDataCollection();
|
||||
|
||||
// Notifications happen only when handling END message
|
||||
// No end message ==> no need to ACK
|
||||
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr);
|
||||
NotifyTcpAck(TCPFrameType::END, !error_state, error_state,
|
||||
error_state ? TCPAckCode::EndFailed : TCPAckCode::None,
|
||||
error_state ? err : "");
|
||||
}
|
||||
|
||||
void StreamWriter::FinalizeDataCollection() {
|
||||
@@ -155,7 +192,6 @@ void StreamWriter::FinalizeDataCollection() {
|
||||
hdf5_data_file_statistics.clear();
|
||||
}
|
||||
file_writer.reset();
|
||||
NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr);
|
||||
logger.Info("Data writing finished");
|
||||
state = StreamWriterState::Finalized;
|
||||
}
|
||||
@@ -168,6 +204,21 @@ void StreamWriter::CollectImages() {
|
||||
while (run && state != StreamWriterState::Finalized) {
|
||||
run = WaitForImage();
|
||||
|
||||
if (image_puller_output.tcp_msg &&
|
||||
static_cast<TCPFrameType>(image_puller_output.tcp_msg->header.type) == TCPFrameType::CANCEL) {
|
||||
logger.Warning("Received TCP CANCEL, finalizing data collection");
|
||||
if (state != StreamWriterState::Idle && state != StreamWriterState::Finalized)
|
||||
FinalizeDataCollection();
|
||||
NotifyTcpAck(TCPFrameType::CANCEL, true, false, TCPAckCode::None);
|
||||
state = StreamWriterState::Finalized;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!image_puller_output.cbor) {
|
||||
logger.Warning("Missing CBOR payload for non-CANCEL TCP frame");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (image_puller_output.cbor->start_message)
|
||||
ProcessStartMessage();
|
||||
else if (image_puller_output.cbor->calibration)
|
||||
|
||||
Reference in New Issue
Block a user