/********************************************//** * @file UDPStandardImplementation.cpp * @short does all the functions for a receiver, set/get parameters, start/stop etc. ***********************************************/ #include "UDPStandardImplementation.h" #include "GeneralData.h" #include "Listener.h" #include "DataProcessor.h" #include "DataStreamer.h" #include "Fifo.h" #include "ZmqSocket.h" //just for the zmq port define #include //system #include //strcpy #include //eperm #include //ceil using namespace std; /** cosntructor & destructor */ UDPStandardImplementation::UDPStandardImplementation() { InitializeMembers(); } UDPStandardImplementation::~UDPStandardImplementation() { DeleteMembers(); } void UDPStandardImplementation::DeleteMembers() { if (generalData) { delete generalData; generalData=0;} for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) delete(*it); listener.clear(); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) delete(*it); dataProcessor.clear(); for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) delete(*it); dataStreamer.clear(); for (vector::const_iterator it = fifo.begin(); it != fifo.end(); ++it) delete(*it); fifo.clear(); } void UDPStandardImplementation::InitializeMembers() { UDPBaseImplementation::initializeMembers(); acquisitionPeriod = SAMPLE_TIME_IN_NS; //*** receiver parameters *** numThreads = 1; numberofJobs = 1; nroichannels = 0; //** class objects *** generalData = 0; } /*** Overloaded Functions called by TCP Interface ***/ uint64_t UDPStandardImplementation::getTotalFramesCaught() const { uint64_t sum = 0; uint32_t flagsum = 0; vector::const_iterator it; for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { flagsum += ((*it)->GetMeasurementStartedFlag() ? 1 : 0); sum += (*it)->GetNumTotalFramesCaught(); } //no data processed if (flagsum != dataProcessor.size()) return 0; return (sum/dataProcessor.size()); } uint64_t UDPStandardImplementation::getFramesCaught() const { uint64_t sum = 0; uint32_t flagsum = 0; for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0); sum += (*it)->GetNumFramesCaught(); } //no data processed if (flagsum != dataProcessor.size()) return 0; return (sum/dataProcessor.size()); } int64_t UDPStandardImplementation::getAcquisitionIndex() const { uint64_t sum = 0; uint32_t flagsum = 0; for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){ flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0); sum += (*it)->GetActualProcessedAcquisitionIndex(); } //no data processed if (flagsum != dataProcessor.size()) return -1; return (sum/dataProcessor.size()); } void UDPStandardImplementation::setFileFormat(const fileFormat f){ switch(f){ #ifdef HDF5C case HDF5: fileFormatType = HDF5; break; #endif default: fileFormatType = BINARY; break; } //destroy file writer, set file format and create file writer for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->SetFileFormat(f); FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType); } int UDPStandardImplementation::setShortFrameEnable(const int i) { if (myDetectorType != GOTTHARD) { bprintf(RED, "Error: Can not set short frame for this detector\n"); return FAIL; } if (shortFrameEnable != i) { shortFrameEnable = i; if (generalData) delete generalData; if (i != -1) generalData = new ShortGotthardData(); else generalData = new GotthardData(); numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure if (SetupFifoStructure() == FAIL) return FAIL; for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->SetGeneralData(generalData); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->SetGeneralData(generalData); for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) (*it)->SetGeneralData(generalData); } FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable; return OK; } int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) { if (frameToGuiFrequency != freq) { frameToGuiFrequency = freq; } FILE_LOG (logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency; return OK; } int UDPStandardImplementation::setDataStreamEnable(const bool enable) {\ if (dataStreamEnable != enable) { dataStreamEnable = enable; //data sockets have to be created again as the client ones are for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) delete(*it); dataStreamer.clear(); if (enable) { bool error = false; for ( int i = 0; i < numThreads; ++i ) { dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &shortFrameEnable)); dataStreamer[i]->SetGeneralData(generalData); // check again if (streamingPort == 0) streamingPort = DEFAULT_ZMQ_PORTNO + (detID * ((myDetectorType == EIGER) ? 2 : 1) ); // multiplied by 2 as eiger has 2 ports if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP) == FAIL) { error = true; break; } } if (DataStreamer::GetErrorMask() || error) { if (DataStreamer::GetErrorMask()) bprintf(RED,"Error: Could not create data callback threads\n"); else bprintf(RED,"Error: Could not create zmq sockets\n"); for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) delete(*it); dataStreamer.clear(); dataStreamEnable = false; return FAIL; } SetThreadPriorities(); } } FILE_LOG (logINFO) << "Data Send to Gui: " << dataStreamEnable; return OK; } int UDPStandardImplementation::setNumberofSamples(const uint64_t i) { if (numberOfSamples != i) { numberOfSamples = i; //side effects uint32_t ppf = ceil(double(2 * (nroichannels ? nroichannels : DEFAULT_NROI_CHANNELS) * numberOfSamples) / double(generalData->dataSize)); generalData->SetPacketsPerFrame(ppf); numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure if (SetupFifoStructure() == FAIL) return FAIL; } FILE_LOG (logINFO) << "Number of Samples: " << numberOfSamples; FILE_LOG (logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); return OK; } int UDPStandardImplementation::setDynamicRange(const uint32_t i) { if (dynamicRange != i) { dynamicRange = i; //side effects generalData->SetDynamicRange(i,tengigaEnable); numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure if (SetupFifoStructure() == FAIL) return FAIL; } FILE_LOG (logINFO) << "Dynamic Range: " << dynamicRange; return OK; } int UDPStandardImplementation::setTenGigaEnable(const bool b) { if (tengigaEnable != b) { tengigaEnable = b; //side effects generalData->SetTenGigaEnable(b,dynamicRange); numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure if (SetupFifoStructure() == FAIL) return FAIL; } FILE_LOG (logINFO) << "Ten Giga: " << stringEnable(tengigaEnable); return OK; } int UDPStandardImplementation::setFifoDepth(const uint32_t i) { if (fifoDepth != i) { fifoDepth = i; numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure if (SetupFifoStructure() == FAIL) return FAIL; } FILE_LOG (logINFO) << "Fifo Depth: " << i; return OK; } int UDPStandardImplementation::setDetectorType(const detectorType d) { FILE_LOG (logDEBUG) << "Setting receiver type"; DeleteMembers(); InitializeMembers(); myDetectorType = d; switch(myDetectorType) { case GOTTHARD: case PROPIX: case MOENCH: case EIGER: case JUNGFRAUCTB: case JUNGFRAU: FILE_LOG (logINFO) << " ***** " << getDetectorType(d) << " Receiver *****"; break; default: FILE_LOG (logERROR) << "This is an unknown receiver type " << (int)d; return FAIL; } //set detector specific variables switch(myDetectorType) { case GOTTHARD: generalData = new GotthardData(); break; case PROPIX: generalData = new PropixData(); break; case MOENCH: generalData = new Moench02Data(); break; case EIGER: generalData = new EigerData(); break; case JUNGFRAUCTB: generalData = new JCTBData(); break; case JUNGFRAU: generalData = new JungfrauData(); break; default: break; } numThreads = generalData->threadsPerReceiver; fifoDepth = generalData->defaultFifoDepth; //local network parameters SetLocalNetworkParameters(); //create fifo structure numberofJobs = -1; if (SetupFifoStructure() == FAIL) { FILE_LOG (logERROR) << "Error: Could not allocate memory for fifo structure"; return FAIL; } //create threads for ( int i=0; i < numThreads; ++i ) { listener.push_back(new Listener(myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange)); dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable, &frameToGuiFrequency, &frameToGuiTimerinMS, rawDataReadyCallBack,pRawDataReady)); if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) { FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")"; for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) delete(*it); listener.clear(); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) delete(*it); dataProcessor.clear(); return FAIL; } } //set up writer and callbacks for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->SetGeneralData(generalData); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { (*it)->SetGeneralData(generalData); } SetThreadPriorities(); FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d); return OK; } void UDPStandardImplementation::setDetectorPositionId(const int i){ detID = i; FILE_LOG(logINFO) << "Detector Position Id:" << detID; for (unsigned int i = 0; i < dataProcessor.size(); ++i) { dataProcessor[i]->SetupFileWriter((int*)numDet, fileName, filePath, &fileIndex, &overwriteEnable, &detID, &numThreads, &numberOfFrames, &dynamicRange, &udpPortNum[i], generalData); } } void UDPStandardImplementation::resetAcquisitionCount() { for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->ResetParametersforNewAcquisition(); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->ResetParametersforNewAcquisition(); for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) (*it)->ResetParametersforNewAcquisition(); FILE_LOG (logINFO) << "Acquisition Count has been reset"; } int UDPStandardImplementation::startReceiver(char *c) { bprintf(GRAY,"\n"); FILE_LOG(logINFO) << "Starting Receiver"; ResetParametersforNewMeasurement(); //listener if (CreateUDPSockets() == FAIL) { strcpy(c,"Could not create UDP Socket(s)."); FILE_LOG(logERROR) << c; return FAIL; } //callbacks if (startAcquisitionCallBack) { startAcquisitionCallBack(filePath, fileName, fileIndex, (generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition); if (rawDataReadyCallBack != NULL) { FILE_LOG(logINFO) << "Data Write has been defined externally"; } } //processor->writer if (fileWriteEnable) { if (SetupWriter() == FAIL) { strcpy(c,"Could not create file."); FILE_LOG(logERROR) << c; return FAIL; } } else FILE_LOG(logINFO) << "File Write Disabled"; FILE_LOG(logINFO) << "Ready ..."; //status status = RUNNING; //Let Threads continue to be ready for acquisition StartRunning(); FILE_LOG(logINFO) << "Receiver Started"; FILE_LOG(logINFO) << "Status: " << runStatusType(status); return OK; } void UDPStandardImplementation::stopReceiver(){ FILE_LOG(logINFO) << "Stopping Receiver"; //set status to transmitting startReadout(); //wait for the processes to be done while(Listener::GetRunningMask()){ usleep(5000); } while(DataProcessor::GetRunningMask()){ usleep(5000); } //create virtual file if (fileWriteEnable && fileFormatType == HDF5) { uint64_t maxIndexCaught = 0; bool anycaught = false; for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex()); if((*it)->GetMeasurementStartedFlag()) anycaught = true; } if (anycaught) dataProcessor[0]->EndofAcquisition(maxIndexCaught); //to create virtual file } while(DataStreamer::GetRunningMask()){ usleep(5000); } status = RUN_FINISHED; FILE_LOG(logINFO) << "Status: " << runStatusType(status); { //statistics uint64_t tot = 0; for (int i = 0; i < numThreads; i++) { tot += dataProcessor[i]->GetNumFramesCaught(); uint64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetPacketsCaught(); if (missingpackets) { bprintf(RED, "\n[Port %d]\n",udpPortNum[i]); bprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)missingpackets); bprintf(RED, "Complete Frames\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); bprintf(RED, "Last Frame Caught\t: %lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); }else{ bprintf(GREEN, "\n[Port %d]\n",udpPortNum[i]); bprintf(GREEN, "Missing Packets\t\t: %lld\n",(long long int)missingpackets); bprintf(GREEN, "Complete Frames\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); bprintf(GREEN, "Last Frame Caught\t: %lld\n",(long long int)listener[i]->GetLastFrameIndexCaught()); } } if(!activated) bprintf(RED,"Note: Deactivated Receiver\n"); //callback if (acquisitionFinishedCallBack) acquisitionFinishedCallBack((tot/numThreads), pAcquisitionFinished); } //change status status = IDLE; FILE_LOG(logINFO) << "Receiver Stopped"; FILE_LOG(logINFO) << "Status: " << runStatusType(status); } void UDPStandardImplementation::startReadout(){ if(status == RUNNING){ //needs to wait for packets only if activated if(activated){ //current packets caught volatile int totalP = 0,prev=-1; for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) totalP += (*it)->GetPacketsCaught(); //wait for all packets if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){ //wait as long as there is change from prev totalP, while(prev != totalP){ #ifdef VERY_VERBOSE bprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d\n", prev,totalP); #endif //usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000); usleep(5*1000);/* Need to find optimal time **/ prev = totalP; totalP = 0; for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) totalP += (*it)->GetPacketsCaught(); #ifdef VERY_VERBOSE bprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP); #endif } } } //set status status = TRANSMITTING; FILE_LOG(logINFO) << "Status: Transmitting"; } //shut down udp sockets so as to make listeners push dummy (end) packets for processors shutDownUDPSockets(); } void UDPStandardImplementation::shutDownUDPSockets() { for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->ShutDownUDPSocket(); } void UDPStandardImplementation::closeFiles() { uint64_t maxIndexCaught = 0; for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { (*it)->CloseFiles(); maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex()); } if (maxIndexCaught) dataProcessor[0]->EndofAcquisition(maxIndexCaught); } void UDPStandardImplementation::SetLocalNetworkParameters() { //to increase socket receiver buffer size and max length of input queue by changing kernel settings if (myDetectorType == EIGER) return; char command[255]; //to increase Socket Receiver Buffer size sprintf(command,"echo $((%d)) > /proc/sys/net/core/rmem_max",RECEIVE_SOCKET_BUFFER_SIZE); if (system(command)) { FILE_LOG (logWARNING) << "No root privileges to change Socket Receiver Buffer size (net.core.rmem_max)"; return; } FILE_LOG (logINFO) << "Socket Receiver Buffer size (/proc/sys/net/core/rmem_max) modified to " << RECEIVE_SOCKET_BUFFER_SIZE ; // to increase Max length of input packet queue sprintf(command,"echo %d > /proc/sys/net/core/netdev_max_backlog",MAX_SOCKET_INPUT_PACKET_QUEUE); if (system(command)) { FILE_LOG (logWARNING) << "No root privileges to change Max length of input packet queue (net.core.rmem_max)"; return; } FILE_LOG (logINFO) << "Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog) modified to " << MAX_SOCKET_INPUT_PACKET_QUEUE ; } void UDPStandardImplementation::SetThreadPriorities() { for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it){ if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) { FILE_LOG(logWARNING) << "No root privileges to prioritize listener threads"; return; } } ostringstream osfn; osfn << "Priorities set - " "Listener:" << LISTENER_PRIORITY; FILE_LOG(logINFO) << osfn.str(); } int UDPStandardImplementation::SetupFifoStructure() { numberofJobs = 1; for (vector::const_iterator it = fifo.begin(); it != fifo.end(); ++it) delete(*it); fifo.clear(); for ( int i = 0; i < numThreads; i++ ) { //create fifo structure bool success = true; fifo.push_back( new Fifo ( (generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize), fifoDepth, success)); if (!success) { bprintf(RED,"Error: Could not allocate memory for fifo structure of index %d\n", i); for (vector::const_iterator it = fifo.begin(); it != fifo.end(); ++it) delete(*it); fifo.clear(); return FAIL; } //set the listener & dataprocessor threads to point to the right fifo if(listener.size())listener[i]->SetFifo(fifo[i]); if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]); if(dataStreamer.size())dataStreamer[i]->SetFifo(fifo[i]); } FILE_LOG (logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ; FILE_LOG (logINFO) << " Fifo structure(s) reconstructed: " << numThreads; return OK; } void UDPStandardImplementation::ResetParametersforNewMeasurement() { Listener::ResetRunningMask(); DataProcessor::ResetRunningMask(); DataStreamer::ResetRunningMask(); for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) (*it)->ResetParametersforNewMeasurement(); for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) (*it)->ResetParametersforNewMeasurement(); if (dataStreamEnable) { char fnametostream[1000]; sprintf(fnametostream, "%s/%s", filePath, fileName); for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) (*it)->ResetParametersforNewMeasurement(fnametostream); } } int UDPStandardImplementation::CreateUDPSockets() { bool error = false; for (unsigned int i = 0; i < listener.size(); ++i) if (listener[i]->CreateUDPSockets() == FAIL) { error = true; break; } if (error) { shutDownUDPSockets(); return FAIL; } FILE_LOG(logDEBUG) << "UDP socket(s) created successfully."; return OK; } int UDPStandardImplementation::SetupWriter() { bool error = false; for (unsigned int i = 0; i < dataProcessor.size(); ++i) if (dataProcessor[i]->CreateNewFile(tengigaEnable, numberOfFrames, acquisitionTime, subExpTime, acquisitionPeriod) == FAIL) { error = true; break; } if (error) { shutDownUDPSockets(); closeFiles(); return FAIL; } return OK; } void UDPStandardImplementation::StartRunning() { //set running mask and post semaphore to start the inner loop in execution thread for (vector::const_iterator it = listener.begin(); it != listener.end(); ++it) { (*it)->StartRunning(); (*it)->Continue(); } for (vector::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){ (*it)->StartRunning(); (*it)->Continue(); } for (vector::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){ (*it)->StartRunning(); (*it)->Continue(); } }