client: moved shortenable to roi in reciever, roi not yet written in master file

This commit is contained in:
2018-09-19 17:35:26 +02:00
parent 961489edb1
commit c784f0f539
38 changed files with 615 additions and 459 deletions

View File

@ -16,7 +16,6 @@
#include <cstring> //strcpy
#include <errno.h> //eperm
#include <fstream>
using namespace std;
/** cosntructor & destructor */
@ -33,16 +32,16 @@ UDPStandardImplementation::~UDPStandardImplementation() {
void UDPStandardImplementation::DeleteMembers() {
if (generalData) { delete generalData; generalData=0;}
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
}
@ -69,8 +68,8 @@ uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
vector<DataProcessor*>::const_iterator it;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
std::vector<DataProcessor*>::const_iterator it;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetMeasurementStartedFlag() ? 1 : 0);
sum += (*it)->GetNumTotalFramesCaught();
}
@ -85,7 +84,7 @@ uint64_t UDPStandardImplementation::getFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetNumFramesCaught();
}
@ -100,7 +99,7 @@ int64_t UDPStandardImplementation::getAcquisitionIndex() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetActualProcessedAcquisitionIndex();
}
@ -120,7 +119,7 @@ int UDPStandardImplementation::setGapPixelsEnable(const bool b) {
// side effects
generalData->SetGapPixelsEnable(b, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
@ -144,7 +143,7 @@ void UDPStandardImplementation::setFileFormat(const fileFormat f){
break;
}
//destroy file writer, set file format and create file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetFileFormat(f);
FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType);
@ -168,35 +167,64 @@ void UDPStandardImplementation::setFileWriteEnable(const bool b){
int UDPStandardImplementation::setShortFrameEnable(const int i) {
int UDPStandardImplementation::setROI(const std::vector<slsReceiverDefs::ROI*> i) {
if (myDetectorType != GOTTHARD) {
cprintf(RED, "Error: Can not set short frame for this detector\n");
cprintf(RED, "Error: Can not set ROI for this detector\n");
return FAIL;
}
if (shortFrameEnable != i) {
shortFrameEnable = i;
if (generalData)
delete generalData;
if (i != -1)
generalData = new ShortGotthardData();
else
generalData = new GotthardData();
bool change = false;
if (roi.size() != i.size())
change = true;
else {
for (unsigned int iloop = 0; iloop < i.size(); ++iloop) {
if (
(roi[iloop]->xmin != i[iloop]->xmin) ||
(roi[iloop]->xmax != i[iloop]->xmax) ||
(roi[iloop]->ymin != i[iloop]->ymin) ||
(roi[iloop]->xmax != i[iloop]->xmax)) {
change = true;
break;
}
}
}
if (change) {
roi = i;
generalData->SetROI(i);
framesPerFile = generalData->maxFramesPerFile;
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetGeneralData(generalData);
}
FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable;
std::stringstream sstm;
sstm << "ROI: ";
if (!roi.size())
sstm << "0";
else {
for (unsigned int i = 0; i < roi.size(); ++i) {
sstm << "( " <<
roi[i]->xmin << ", " <<
roi[i]->xmax << ", " <<
roi[i]->ymin << ", " <<
roi[i]->ymax << " )";
}
}
std::string message = sstm.str();
FILE_LOG(logINFO) << message;
return OK;
}
@ -216,21 +244,22 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
dataStreamEnable = enable;
//data sockets have to be created again as the client ones are
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
if (enable) {
for ( int i = 0; i < numThreads; ++i ) {
try {
DataStreamer* s = new DataStreamer(i, fifo[i], &dynamicRange,
&shortFrameEnable, &fileIndex, flippedData, additionalJsonHeader, &silentMode);
&roi, &fileIndex, flippedData, additionalJsonHeader, &silentMode);
dataStreamer.push_back(s);
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP);
}
catch(...) {
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
@ -269,7 +298,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
generalData->SetDynamicRange(i,tengigaEnable);
generalData->SetGapPixelsEnable(gapPixelsEnable, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
@ -374,10 +403,10 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
catch (...) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
return FAIL;
@ -385,9 +414,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
//set up writer and callbacks
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
SetThreadPriorities();
@ -427,13 +456,13 @@ void UDPStandardImplementation::setDetectorPositionId(const int i){
void UDPStandardImplementation::resetAcquisitionCount() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
FILE_LOG(logINFO) << "Acquisition Count has been reset";
@ -498,10 +527,10 @@ void UDPStandardImplementation::stopReceiver(){
bool running = true;
while(running) {
running = false;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
if ((*it)->IsRunning())
running = true;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
@ -512,8 +541,8 @@ void UDPStandardImplementation::stopReceiver(){
if (fileWriteEnable && fileFormatType == HDF5) {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
}
@ -525,7 +554,7 @@ void UDPStandardImplementation::stopReceiver(){
running = true;
while(running) {
running = false;
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
@ -575,7 +604,7 @@ void UDPStandardImplementation::startReadout(){
// wait for incoming delayed packets
//current packets caught
volatile int totalP = 0,prev=-1;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
//wait for all packets
@ -594,7 +623,7 @@ void UDPStandardImplementation::startReadout(){
prev = totalP;
totalP = 0;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
@ -613,7 +642,7 @@ void UDPStandardImplementation::startReadout(){
void UDPStandardImplementation::shutDownUDPSockets() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ShutDownUDPSocket();
}
@ -622,9 +651,9 @@ void UDPStandardImplementation::shutDownUDPSockets() {
void UDPStandardImplementation::closeFiles() {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->CloseFiles();
maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
}
@ -640,7 +669,7 @@ int UDPStandardImplementation::setUDPSocketBufferSize(const uint32_t s) {
int UDPStandardImplementation::restreamStop() {
bool ret = OK;
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
if ((*it)->RestreamStop() == FAIL)
ret = FAIL;
}
@ -660,14 +689,14 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
int max_back_log;
const char *proc_file_name = "/proc/sys/net/core/netdev_max_backlog";
{
ifstream proc_file(proc_file_name);
std::ifstream proc_file(proc_file_name);
proc_file >> max_back_log;
}
if (max_back_log < MAX_SOCKET_INPUT_PACKET_QUEUE) {
ofstream proc_file(proc_file_name);
std::ofstream proc_file(proc_file_name);
if (proc_file.good()) {
proc_file << MAX_SOCKET_INPUT_PACKET_QUEUE << endl;
proc_file << MAX_SOCKET_INPUT_PACKET_QUEUE << std::endl;
cprintf(GREEN, "Max length of input packet queue "
"[/proc/sys/net/core/netdev_max_backlog] modified to %d\n",
MAX_SOCKET_INPUT_PACKET_QUEUE);
@ -683,13 +712,13 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
void UDPStandardImplementation::SetThreadPriorities() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Could not prioritize listener threads. (No Root Privileges?)";
return;
}
}
ostringstream osfn;
std::ostringstream osfn;
osfn << "Priorities set - "
"Listener:" << LISTENER_PRIORITY;
@ -701,7 +730,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
numberofJobs = 1;
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
for ( int i = 0; i < numThreads; i++ ) {
@ -714,7 +743,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
fifo.push_back(f);
} catch (...) {
cprintf(RED,"Error: Could not allocate memory for fifo structure of index %d\n", i);
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
return FAIL;
@ -733,15 +762,15 @@ int UDPStandardImplementation::SetupFifoStructure() {
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
if (dataStreamEnable) {
char fnametostream[MAX_STR_LENGTH];
snprintf(fnametostream, MAX_STR_LENGTH, "%s/%s", filePath, fileName);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement(fnametostream);
}
}
@ -785,15 +814,15 @@ int UDPStandardImplementation::SetupWriter() {
void UDPStandardImplementation::StartRunning() {
//set running mask and post semaphore to start the inner loop in execution thread
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
(*it)->StartRunning();
(*it)->Continue();
}
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}