Merge pull request #216 from slsdetectorgroup/callbacksafe

Callbacksafe
This commit is contained in:
Dhanya Thattil 2020-11-17 16:26:07 +01:00 committed by GitHub
commit 2fa3ebd8e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 46 deletions

View File

@ -1,8 +1,8 @@
#include "qDrawPlot.h"
#include "SlsQt1DPlot.h"
#include "SlsQt2DPlot.h"
#include "sls/detectorData.h"
#include "qCloneWidget.h"
#include "sls/detectorData.h"
#include "sls/ToString.h"
#include "sls/detectorData.h"

View File

@ -64,13 +64,13 @@ int main(int argc, char *argv[]) {
}
try {
// How big should this try block be?
sls::Detector det(parser.multi_id());
sls::CmdProxy proxy(&det);
proxy.Call(parser.command(), parser.arguments(), parser.detector_id(),
action);
} catch (const sls::RuntimeError &e) {
// OK to catch and do nothing since this will print the error message
// and command line app will anyway exit
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}

View File

@ -660,11 +660,14 @@ void DetectorImpl::readFrameFromReceiver() {
nDetActualPixelsX, nDetActualPixelsY,
callbackImage, imagesize, dynamicRange,
currentFileIndex, completeImage);
dataReady(
thisData, currentFrameIndex,
((dynamicRange == 32 && eiger) ? currentSubFrameIndex : -1),
pCallbackArg);
try {
dataReady(
thisData, currentFrameIndex,
((dynamicRange == 32 && eiger) ? currentSubFrameIndex : -1),
pCallbackArg);
} catch (const std::exception &e) {
LOG(logERROR) << "Exception caught from callback: " << e.what();
}
delete thisData;
}
}
@ -1123,6 +1126,10 @@ int DetectorImpl::acquire() {
(end.tv_nsec - begin.tv_nsec) / 1000000000.0)
<< " seconds";
} catch (...) {
if (dataProcessingThread.joinable()){
setJoinThreadFlag(true);
dataProcessingThread.join();
}
setAcquiringFlag(false);
throw;
}

View File

@ -24,18 +24,18 @@ const std::string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f,
fileFormat *ftype, bool fwenable, bool *mfwenable,
bool *dsEnable, uint32_t *freq,
uint32_t *timer, uint32_t *sfnum, bool *fp,
bool *act, bool *depaden, bool *sm,
std::vector<int> *cdl, int *cdo, int *cad)
bool *dsEnable, uint32_t *freq, uint32_t *timer,
uint32_t *sfnum, bool *fp, bool *act,
bool *depaden, bool *sm, std::vector<int> *cdl,
int *cdo, int *cad)
: ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype),
dataStreamEnable(dsEnable), fileFormatType(ftype),
fileWriteEnable(fwenable), masterFileWriteEnable(mfwenable),
streamingFrequency(freq), streamingTimerInMs(timer),
streamingStartFnum(sfnum), activated(act),
deactivatedPaddingEnable(depaden), silentMode(sm),
framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad), firstStreamerFrame(false) {
deactivatedPaddingEnable(depaden), silentMode(sm), framePadding(fp),
ctbDbitList(cdl), ctbDbitOffset(cdo), ctbAnalogDataBytes(cad),
firstStreamerFrame(false) {
LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void *)&timerBegin, 0, sizeof(timespec));
}
@ -185,12 +185,17 @@ void DataProcessor::ThreadExecution() {
return;
}
uint64_t fnum = ProcessAnImage(buffer);
uint64_t fnum = 0;
try {
fnum = ProcessAnImage(buffer);
} catch (const std::exception &e) {
fifo->FreeAddress(buffer);
return;
}
// stream (if time/freq to stream) or free
if (*dataStreamEnable && SendToStreamer()) {
// if first frame to stream, add frame index to fifo header (might not
// be the first)
// if first frame to stream, add frame index to fifo header (might
// not be the first)
if (firstStreamerFrame) {
firstStreamerFrame = false;
(*((uint32_t *)(buffer + FIFO_DATASIZE_NUMBYTES))) =
@ -256,22 +261,27 @@ uint64_t DataProcessor::ProcessAnImage(char *buf) {
RearrangeDbitData(buf);
}
// normal call back
if (rawDataReadyCallBack != nullptr) {
rawDataReadyCallBack((char *)rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t *)buf)), pRawDataReady);
}
try {
// normal call back
if (rawDataReadyCallBack != nullptr) {
rawDataReadyCallBack((char *)rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t *)buf)), pRawDataReady);
}
// call back with modified size
else if (rawDataModifyReadyCallBack != nullptr) {
auto revsize = (uint32_t)(*((uint32_t *)buf));
rawDataModifyReadyCallBack((char *)rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
revsize, pRawDataReady);
(*((uint32_t *)buf)) = revsize;
// call back with modified size
else if (rawDataModifyReadyCallBack != nullptr) {
auto revsize = (uint32_t)(*((uint32_t *)buf));
rawDataModifyReadyCallBack((char *)rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
revsize, pRawDataReady);
(*((uint32_t *)buf)) = revsize;
}
} catch (const std::exception &e) {
throw sls::RuntimeError("Get Data Callback Error: " +
std::string(e.what()));
}
// write to file
@ -284,8 +294,8 @@ uint64_t DataProcessor::ProcessAnImage(char *buf) {
// from previous call back
fnum - firstIndex, nump);
} catch (const sls::RuntimeError &e) {
; // ignore write exception for now (TODO: send error message via
// stopReceiver tcp)
; // ignore write exception for now (TODO: send error message
// via stopReceiver tcp)
}
}
return fnum;
@ -385,8 +395,8 @@ void DataProcessor::PadMissingPackets(char *buf) {
// missing packet
switch (myDetectorType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA +
// CACA, 639*2 bytes data
// for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes
// data
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data +
// 640*2 bytes data !!
case GOTTHARD:

View File

@ -497,10 +497,15 @@ void Implementation::startReceiver() {
// callbacks
if (startAcquisitionCallBack) {
startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->imageSize) +
(generalData->fifoBufferHeaderSize),
pStartAcquisition);
try {
startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->imageSize) +
(generalData->fifoBufferHeaderSize),
pStartAcquisition);
} catch (const std::exception &e) {
throw sls::RuntimeError("Start Acquisition Callback Error: " +
std::string(e.what()));
}
if (rawDataReadyCallBack != nullptr) {
LOG(logINFO) << "Data Write has been defined externally";
}
@ -594,9 +599,20 @@ void Implementation::stopReceiver() {
LOG(logINFORED) << "Deactivated Receiver";
}
// callback
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((tot / numThreads),
pAcquisitionFinished);
if (acquisitionFinishedCallBack) {
try {
acquisitionFinishedCallBack((tot / numThreads),
pAcquisitionFinished);
} catch (const std::exception &e) {
// change status
status = IDLE;
LOG(logINFO) << "Receiver Stopped";
LOG(logINFO) << "Status: " << sls::ToString(status);
throw sls::RuntimeError(
"Acquisition Finished Callback Error: " +
std::string(e.what()));
}
}
}
// change status

View File

@ -70,6 +70,12 @@ ssize_t UdpRxSocket::ReceiveDataOnly(char *dst) noexcept {
LOG(logWARNING) << "Got header pkg";
r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
}
// temporary workaround for Eiger firmware (stop sends bad packets of size 8
// bytes)
if (r == 8) {
LOG(logWARNING) << "Ignoring bad packet of size 8 bytes";
r = recvfrom(sockfd_, dst, packet_size_, 0, nullptr, nullptr);
}
return r;
}