merge conflict resolved

This commit is contained in:
2018-10-30 14:31:20 +01:00
17 changed files with 4688 additions and 4958 deletions

View File

@ -22,7 +22,7 @@
const std::string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
fileFormat* ftype, bool fwenable,
bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
@ -125,7 +125,7 @@ void DataProcessor::StopRunning() {
runningFlag = false;
}
void DataProcessor::SetFifo(Fifo*& f) {
void DataProcessor::SetFifo(Fifo* f) {
fifo = f;
}

View File

@ -15,7 +15,7 @@
const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo*& f, uint32_t* dr, std::vector<ROI>* r,
DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, std::vector<ROI>* r,
uint64_t* fi, int* fd, char* ajh, bool* sm) :
ThreadObject(ind),
runningFlag(0),
@ -70,7 +70,7 @@ void DataStreamer::StopRunning() {
runningFlag = false;
}
void DataStreamer::SetFifo(Fifo*& f) {
void DataStreamer::SetFifo(Fifo* f) {
fifo = f;
}

View File

@ -18,7 +18,7 @@
const std::string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
Listener::Listener(int ind, detectorType dtype, Fifo* f, runStatus* s,
uint32_t* portno, char* e, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
@ -108,7 +108,7 @@ void Listener::StopRunning() {
}
void Listener::SetFifo(Fifo*& f) {
void Listener::SetFifo(Fifo* f) {
fifo = f;
}

View File

@ -42,20 +42,9 @@ void slsReceiverImplementation::DeleteMembers() {
generalData=0;
}
for (auto* it : listener)
delete it;
listener.clear();
for (auto* it : dataProcessor)
delete it;
dataProcessor.clear();
for (auto* it : dataStreamer)
delete it;
dataStreamer.clear();
for (auto* it : fifo)
delete it;
fifo.clear();
}
@ -83,7 +72,6 @@ void slsReceiverImplementation::InitializeMembers() {
//*** receiver parameters ***
numThreads = 1;
numberofJobs = 1;
nroichannels = 0;
status = IDLE;
activated = true;
@ -217,15 +205,14 @@ uint64_t slsReceiverImplementation::getTotalFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
std::vector<DataProcessor*>::const_iterator it;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetMeasurementStartedFlag() ? 1 : 0);
sum += (*it)->GetNumTotalFramesCaught();
for (const auto& it : dataProcessor){
flagsum += it->GetMeasurementStartedFlag();
sum += it->GetNumTotalFramesCaught();
}
//no data processed
if (flagsum != dataProcessor.size()) return 0;
if (flagsum != dataProcessor.size())
return 0;
return (sum/dataProcessor.size());
}
@ -233,13 +220,13 @@ uint64_t slsReceiverImplementation::getFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetNumFramesCaught();
for (const auto& it : dataProcessor){
flagsum += it->GetMeasurementStartedFlag();
sum += it->GetNumFramesCaught();
}
//no data processed
if (flagsum != dataProcessor.size()) return 0;
if (flagsum != dataProcessor.size())
return 0;
return (sum/dataProcessor.size());
}
@ -248,13 +235,13 @@ int64_t slsReceiverImplementation::getAcquisitionIndex() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetActualProcessedAcquisitionIndex();
for (const auto& it : dataProcessor){
flagsum += it->GetMeasurementStartedFlag();
sum += it->GetActualProcessedAcquisitionIndex();
}
//no data processed
if (flagsum != dataProcessor.size()) return -1;
if (flagsum != dataProcessor.size())
return -1;
return (sum/dataProcessor.size());
}
@ -409,25 +396,25 @@ void slsReceiverImplementation::setDetectorHostname(const char *c){
void slsReceiverImplementation::setMultiDetectorSize(const int* size) {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
char message[100];
strcpy(message, "Detector Size: (");
std::string log_message = "Detector Size: (";
for (int i = 0; i < MAX_DIMENSIONS; ++i) {
if (myDetectorType == EIGER && (!i))
numDet[i] = size[i]*2;
else
numDet[i] = size[i];
sprintf(message,"%s%d",message,numDet[i]);
log_message += std::to_string(numDet[i]);
if (i < MAX_DIMENSIONS-1 )
strcat(message,",");
log_message += ", ";
}
strcat(message,")");
FILE_LOG(logINFO) << message;
log_message += ")";
FILE_LOG(logINFO) << log_message;
}
void slsReceiverImplementation::setFlippedData(int axis, int enable){
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
if(axis<0 || axis>1) return;
if(axis<0 || axis>1)
return;
flippedData[axis] = enable==0?0:1;
FILE_LOG(logINFO) << "Flipped Data: " << flippedData[0] << " , " << flippedData[1];
}
@ -439,11 +426,8 @@ int slsReceiverImplementation::setGapPixelsEnable(const bool b) {
// side effects
generalData->SetGapPixelsEnable(b, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
for (const auto& it : dataProcessor)
it->SetPixelDimension();
if (SetupFifoStructure() == FAIL)
return FAIL;
}
@ -463,9 +447,9 @@ void slsReceiverImplementation::setFileFormat(const fileFormat f){
fileFormatType = BINARY;
break;
}
//destroy file writer, set file format and create file writer
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetFileFormat(f);
for(const auto& it : dataProcessor)
it->SetFileFormat(f);
FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType);
}
@ -612,17 +596,15 @@ int slsReceiverImplementation::setROI(const std::vector<slsDetectorDefs::ROI> i)
generalData->SetROI(i);
framesPerFile = generalData->maxFramesPerFile;
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetGeneralData(generalData);
for (const auto& it : listener)
it->SetGeneralData(generalData);
for (const auto& it : dataProcessor)
it->SetGeneralData(generalData);
for (const auto& it : dataStreamer)
it->SetGeneralData(generalData);
}
@ -667,28 +649,23 @@ int slsReceiverImplementation::setDataStreamEnable(const bool enable) {
dataStreamEnable = enable;
//data sockets have to be created again as the client ones are
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
if (enable) {
for ( int i = 0; i < numThreads; ++i ) {
try {
DataStreamer* s = new DataStreamer(i, fifo[i], &dynamicRange,
&roi, &fileIndex, flippedData, additionalJsonHeader, &silentMode);
dataStreamer.push_back(s);
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP);
}
catch(...) {
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
return FAIL;
}
}
SetThreadPriorities();
for ( int i = 0; i < numThreads; ++i ) {
try {
dataStreamer.push_back(sls::make_unique<DataStreamer>(i, fifo[i].get(), &dynamicRange,
&roi, &fileIndex, flippedData, additionalJsonHeader, &silentMode));
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP);
}
catch(...) {
dataStreamer.clear();
dataStreamEnable = false;
return FAIL;
}
}
SetThreadPriorities();
}
}
FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
@ -767,7 +744,6 @@ int slsReceiverImplementation::setNumberofSamples(const uint64_t i) {
numberOfSamples = i;
generalData->setNumberofSamples(i, nroichannels);
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
@ -780,15 +756,11 @@ int slsReceiverImplementation::setNumberofSamples(const uint64_t i) {
int slsReceiverImplementation::setDynamicRange(const uint32_t i) {
if (dynamicRange != i) {
dynamicRange = i;
//side effects
generalData->SetDynamicRange(i,tengigaEnable);
generalData->SetGapPixelsEnable(gapPixelsEnable, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
for (const auto& it : dataProcessor)
it->SetPixelDimension();
if (SetupFifoStructure() == FAIL)
return FAIL;
}
@ -802,8 +774,6 @@ int slsReceiverImplementation::setTenGigaEnable(const bool b) {
tengigaEnable = b;
//side effects
generalData->SetTenGigaEnable(b,dynamicRange);
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
@ -815,8 +785,6 @@ int slsReceiverImplementation::setTenGigaEnable(const bool b) {
int slsReceiverImplementation::setFifoDepth(const uint32_t i) {
if (fifoDepth != i) {
fifoDepth = i;
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
@ -887,11 +855,7 @@ int slsReceiverImplementation::setDetectorType(const detectorType d) {
udpSocketBufferSize = generalData->defaultUdpSocketBufferSize;
framesPerFile = generalData->maxFramesPerFile;
//local network parameters
SetLocalNetworkParameters();
//create fifo structure
numberofJobs = -1;
if (SetupFifoStructure() == FAIL) {
FILE_LOG(logERROR) << "Could not allocate memory for fifo structure";
return FAIL;
@ -900,38 +864,32 @@ int slsReceiverImplementation::setDetectorType(const detectorType d) {
//create threads
for ( int i = 0; i < numThreads; ++i ) {
try {
Listener* l = new Listener(i, myDetectorType, fifo[i], &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode, &activated, &deactivatedPaddingEnable, &silentMode);
listener.push_back(l);
DataProcessor* p = new DataProcessor(i, myDetectorType, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &streamingFrequency, &streamingTimerInMs,
try {
auto fifo_ptr = fifo[i].get();
listener.push_back(sls::make_unique<Listener>(i, myDetectorType, fifo_ptr, &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode, &activated, &deactivatedPaddingEnable, &silentMode));
dataProcessor.push_back(sls::make_unique<DataProcessor>(i, myDetectorType, fifo_ptr, &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &streamingFrequency, &streamingTimerInMs,
&framePadding, &activated, &deactivatedPaddingEnable, &silentMode,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
dataProcessor.push_back(p);
}
catch (...) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
return FAIL;
}
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady));
}
catch (...) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
listener.clear();
dataProcessor.clear();
return FAIL;
}
}
//set up writer and callbacks
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
for (const auto& it : listener)
it->SetGeneralData(generalData);
for (const auto& it : dataProcessor)
it->SetGeneralData(generalData);
SetThreadPriorities();
// check udp socket buffer size
@ -967,14 +925,12 @@ void slsReceiverImplementation::setDetectorPositionId(const int i){
/***acquisition functions***/
void slsReceiverImplementation::resetAcquisitionCount() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (const auto& it : listener)
it->ResetParametersforNewAcquisition();
for (const auto& it : dataProcessor)
it->ResetParametersforNewAcquisition();
for (const auto& it: dataStreamer)
it->ResetParametersforNewAcquisition();
FILE_LOG(logINFO) << "Acquisition Count has been reset";
}
@ -996,7 +952,7 @@ int slsReceiverImplementation::startReceiver(char *c) {
//callbacks
if (startAcquisitionCallBack) {
startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition);
(generalData->imageSize) + (generalData->fifoBufferHeaderSize), pStartAcquisition);
if (rawDataReadyCallBack != NULL) {
FILE_LOG(logINFO) << "Data Write has been defined externally";
}
@ -1037,14 +993,15 @@ void slsReceiverImplementation::stopReceiver(){
//wait for the processes (Listener and DataProcessor) to be done
bool running = true;
while(running) {
running = false;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
if ((*it)->IsRunning())
running = false;
for (const auto& it : listener)
if (it->IsRunning())
running = true;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
if ((*it)->IsRunning())
for (const auto& it : dataProcessor)
if (it->IsRunning())
running = true;
usleep(5000);
usleep(5000);
}
@ -1052,9 +1009,9 @@ void slsReceiverImplementation::stopReceiver(){
if (fileWriteEnable && fileFormatType == HDF5) {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
for (const auto& it : dataProcessor) {
maxIndexCaught = std::max(maxIndexCaught, it->GetProcessedMeasurementIndex());
if(it->GetMeasurementStartedFlag())
anycaught = true;
}
//to create virtual file & set files/acquisition to 0 (only hdf5 at the moment)
@ -1063,13 +1020,13 @@ void slsReceiverImplementation::stopReceiver(){
//wait for the processes (DataStreamer) to be done
running = true;
while(running) {
running = false;
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
if ((*it)->IsRunning())
while(running) {
running = false;
for (const auto& it : dataStreamer)
if (it->IsRunning())
running = true;
usleep(5000);
}
usleep(5000);
}
status = RUN_FINISHED;
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
@ -1107,46 +1064,39 @@ void slsReceiverImplementation::stopReceiver(){
void slsReceiverImplementation::startReadout(){
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
if(status == RUNNING){
// wait for incoming delayed packets
//current packets caught
volatile int totalP = 0,prev=-1;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
int totalPacketsReceived = 0;
int previousValue=-1;
for(const auto& it : listener)
totalPacketsReceived += it->GetPacketsCaught();
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
const auto numPacketsToReceive = numberOfFrames * generalData->packetsPerFrame * listener.size();
if(totalPacketsReceived != numPacketsToReceive){
while(totalPacketsReceived != previousValue){
FILE_LOG(logDEBUG3) << "waiting for all packets, previousValue:" << previousValue <<
" totalPacketsReceived: " << totalPacketsReceived;
usleep(5*1000);/* TODO! Need to find optimal time **/
previousValue = totalPacketsReceived;
totalPacketsReceived = 0;
for(const auto& it : listener)
totalPacketsReceived += it->GetPacketsCaught();
//wait as long as there is change from prev totalP,
while(prev != totalP){
FILE_LOG(logDEBUG3) << "waiting for all packets prevP:" << prev <<
" totalP: " << totalP;
//usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);
usleep(5*1000);/* Need to find optimal time **/
prev = totalP;
totalP = 0;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
FILE_LOG(logDEBUG3) << "\tupdated: totalP:" << totalP;
FILE_LOG(logDEBUG3) << "\tupdated: totalPacketsReceived:" << totalPacketsReceived;
}
}
//set status
status = TRANSMITTING;
FILE_LOG(logINFO) << "Status: Transmitting";
}
//shut down udp sockets so as to make listeners push dummy (end) packets for processors
//shut down udp sockets to make listeners push dummy (end) packets for processors
shutDownUDPSockets();
}
void slsReceiverImplementation::shutDownUDPSockets() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ShutDownUDPSocket();
for (const auto& it : listener)
it->ShutDownUDPSocket();
}
@ -1155,10 +1105,10 @@ void slsReceiverImplementation::closeFiles() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->CloseFiles();
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
for (const auto& it : dataProcessor) {
it->CloseFiles();
maxIndexCaught = std::max(maxIndexCaught, it->GetProcessedMeasurementIndex());
if(it->GetMeasurementStartedFlag())
anycaught = true;
}
//to create virtual file & set files/acquisition to 0 (only hdf5 at the moment)
@ -1169,11 +1119,10 @@ void slsReceiverImplementation::closeFiles() {
int slsReceiverImplementation::restreamStop() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
bool ret = OK;
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
if ((*it)->RestreamStop() == FAIL)
for (const auto& it : dataStreamer){
if (it->RestreamStop() == FAIL)
ret = FAIL;
}
// if fail, prints in datastreamer
if (ret == OK) {
FILE_LOG(logINFO) << "Restreaming Dummy Header via ZMQ successful";
@ -1238,8 +1187,8 @@ void slsReceiverImplementation::SetLocalNetworkParameters() {
void slsReceiverImplementation::SetThreadPriorities() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
for (const auto& it : listener){
if (it->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Could not prioritize listener threads. (No Root Privileges?)";
return;
}
@ -1254,34 +1203,27 @@ void slsReceiverImplementation::SetThreadPriorities() {
int slsReceiverImplementation::SetupFifoStructure() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
numberofJobs = 1;
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
for ( int i = 0; i < numThreads; ++i ) {
//create fifo structure
try {
Fifo* f = new Fifo (i,
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
fifoDepth);
fifo.push_back(f);
} catch (...) {
FILE_LOG(logERROR) << "Could not allocate memory for fifo structure of index " << i;
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
return FAIL;
}
try {
fifo.push_back(sls::make_unique<Fifo>(i,
(generalData->imageSize) + (generalData->fifoBufferHeaderSize),
fifoDepth));
} catch (...) {
FILE_LOG(logERROR) << "Could not allocate memory for fifo structure of index " << i;
fifo.clear();
return FAIL;
}
//set the listener & dataprocessor threads to point to the right fifo
if(listener.size())listener[i]->SetFifo(fifo[i]);
if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]);
if(dataStreamer.size())dataStreamer[i]->SetFifo(fifo[i]);
if(listener.size())listener[i]->SetFifo(fifo[i].get());
if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i].get());
if(dataStreamer.size())dataStreamer[i]->SetFifo(fifo[i].get());
}
FILE_LOG(logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ;
FILE_LOG(logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ;
FILE_LOG(logINFO) << numThreads << " Fifo structure(s) reconstructed";
return OK;
}
@ -1290,16 +1232,17 @@ int slsReceiverImplementation::SetupFifoStructure() {
void slsReceiverImplementation::ResetParametersforNewMeasurement() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (const auto& it : listener)
it->ResetParametersforNewMeasurement();
for (const auto& it : dataProcessor)
it->ResetParametersforNewMeasurement();
if (dataStreamEnable) {
char fnametostream[MAX_STR_LENGTH];
snprintf(fnametostream, MAX_STR_LENGTH, "%s/%s", filePath, fileName);
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement(fnametostream);
for (const auto& it : dataStreamer)
it->ResetParametersforNewMeasurement(fnametostream);
}
}
@ -1345,16 +1288,16 @@ int slsReceiverImplementation::SetupWriter() {
void slsReceiverImplementation::StartRunning() {
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
//set running mask and post semaphore to start the inner loop in execution thread
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
(*it)->StartRunning();
(*it)->Continue();
for (const auto& it : listener){
it->StartRunning();
it->Continue();
}
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
for (const auto& it : dataProcessor){
it->StartRunning();
it->Continue();
}
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
for (const auto& it : dataStreamer){
it->StartRunning();
it->Continue();
}
}