mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-14 22:07:12 +02:00
somewhere in between.. next step include generalData pointer in listerner and dataprocessor calss in constructor and a setter
This commit is contained in:
@ -23,22 +23,22 @@ using namespace std;
|
||||
/** cosntructor & destructor */
|
||||
|
||||
UDPStandardImplementation::UDPStandardImplementation() {
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
InitializeMembers();
|
||||
|
||||
}
|
||||
|
||||
|
||||
UDPStandardImplementation::~UDPStandardImplementation(){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
UDPStandardImplementation::~UDPStandardImplementation() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
DeleteMembers();
|
||||
}
|
||||
|
||||
|
||||
void UDPStandardImplementation::DeleteMembers() {
|
||||
FILE_LOG(logDEBUG) << __AT__ << " starting";
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
|
||||
if (generalData){ delete generalData; generalData=0;}
|
||||
if (generalData) { delete generalData; generalData=0;}
|
||||
listener.clear();
|
||||
dataProcessor.clear();
|
||||
dataStreamer.clear();
|
||||
@ -47,8 +47,8 @@ void UDPStandardImplementation::DeleteMembers() {
|
||||
}
|
||||
|
||||
|
||||
void UDPStandardImplementation::InitializeMembers(){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " starting";
|
||||
void UDPStandardImplementation::InitializeMembers() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
|
||||
UDPBaseImplementation::initializeMembers();
|
||||
acquisitionPeriod = SAMPLE_TIME_IN_NS;
|
||||
@ -70,90 +70,40 @@ void UDPStandardImplementation::InitializeMembers(){
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setDetectorType(const detectorType d) {
|
||||
FILE_LOG(logDEBUG) << __AT__ << " starting";
|
||||
|
||||
/*** Overloaded Functions called by TCP Interface ***/
|
||||
|
||||
numThreads = EIGER_PORTS_PER_READOUT;
|
||||
numberofJobs = 1;
|
||||
//killing all threads, deleting members etc.
|
||||
uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
uint64_t sum = 0;
|
||||
vector<DataProcessor*>::const_iterator it;
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
|
||||
sum += (*it)->GetNumTotalFramesCaught();
|
||||
return (sum/dataProcessor.size());
|
||||
}
|
||||
|
||||
uint64_t UDPStandardImplementation::getFramesCaught() const {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
uint64_t sum = 0;
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
|
||||
sum += (*it)->GetNumFramesCaught();
|
||||
return (sum/dataProcessor.size());
|
||||
}
|
||||
|
||||
//generalData = new GotthardData();
|
||||
|
||||
|
||||
|
||||
//only at start or changing parameters
|
||||
for ( int i=0; i < 1; i++ ) {//numthreads; i++ ) {
|
||||
|
||||
bool success = true;
|
||||
fifo.push_back(new Fifo(1024*256,5, success));
|
||||
if (!success) cprintf(RED,"not successful\n");
|
||||
|
||||
|
||||
listener.push_back(new Listener(fifo[i]));
|
||||
dataProcessor.push_back(new DataProcessor(fifo[i]));
|
||||
//dataStreamer.push_back(new DataStreamer(fifo[i]));
|
||||
|
||||
|
||||
//listener[i]->SetFifo(fifo[i]);
|
||||
//dataProcessor[i]->SetFifo(fifo[i]);
|
||||
|
||||
fileWriter.push_back(new BinaryFileWriter(fileName));
|
||||
|
||||
}
|
||||
|
||||
|
||||
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()){
|
||||
cprintf(RED, "Error in creating threads\n");
|
||||
}
|
||||
|
||||
|
||||
//start receiver functions
|
||||
//create udp sockets
|
||||
//create file
|
||||
//reset status
|
||||
//reset all masks
|
||||
Listener::ResetRunningMask();
|
||||
DataProcessor::ResetRunningMask();
|
||||
//DataStreamer::ResetRunningMask();
|
||||
|
||||
|
||||
for( unsigned int i=0; i < listener.size();i++ ) {
|
||||
listener[i]->StartRunning();
|
||||
dataProcessor[i]->StartRunning();
|
||||
listener[i]->Continue();
|
||||
dataProcessor[i]->Continue();
|
||||
}
|
||||
|
||||
|
||||
// for (vector<Listener*>::iterator it = listener.begin(); it != listener.end(); ++it) {
|
||||
//*it->StartRunning();
|
||||
|
||||
usleep (5 * 1000 * 1000);
|
||||
|
||||
|
||||
SetLocalNetworkParameters();
|
||||
|
||||
return OK;
|
||||
int64_t UDPStandardImplementation::getAcquisitionIndex() const {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
//no data processed
|
||||
if(!DataProcessor::GetAcquisitionStartedFlag())
|
||||
return -1;
|
||||
uint64_t sum = 0;
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
|
||||
sum += (*it)->GetProcessedAcquisitionIndex();
|
||||
return (sum/dataProcessor.size());
|
||||
}
|
||||
|
||||
|
||||
uint64_t UDPStandardImplementation::getTotalFramesCaught() const{
|
||||
FILE_LOG(logDEBUG) << __AT__ << " starting";
|
||||
//preventing divide by 0 using ternary operator
|
||||
return (totalPacketsCaught/(packetsPerFrame*(listener.size()>0?listener.size():1)));
|
||||
}
|
||||
|
||||
uint64_t UDPStandardImplementation::getFramesCaught() const{
|
||||
FILE_LOG(logDEBUG) << __AT__ << " starting";
|
||||
//preventing divide by 0 using ternary operator
|
||||
return (packetsCaught/(packetsPerFrame*(listener.size()>0?listener.size():1)));
|
||||
}
|
||||
|
||||
|
||||
void UDPStandardImplementation::setFileName(const char c[]){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " starting";
|
||||
void UDPStandardImplementation::setFileName(const char c[]) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
|
||||
if (strlen(c)) {
|
||||
strcpy(fileName, c); //automatically update fileName in Filewriter (pointer)
|
||||
@ -168,12 +118,12 @@ void UDPStandardImplementation::setFileName(const char c[]){
|
||||
if (detindex == -1)
|
||||
detID = 0;
|
||||
}
|
||||
FILE_LOG(logINFO) << "File name:" << fileName;
|
||||
FILE_LOG (logINFO) << "File name:" << fileName;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setShortFrameEnable(const int i){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
int UDPStandardImplementation::setShortFrameEnable(const int i) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (myDetectorType != GOTTHARD) {
|
||||
cprintf(RED, "Error: Can not set short frame for this detector\n");
|
||||
@ -190,47 +140,47 @@ int UDPStandardImplementation::setShortFrameEnable(const int i){
|
||||
else
|
||||
generalData = new GotthardData();
|
||||
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
|
||||
if(SetupFifoStructure() == FAIL)
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
}
|
||||
FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable;
|
||||
FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (frameToGuiFrequency != freq){
|
||||
if (frameToGuiFrequency != freq) {
|
||||
frameToGuiFrequency = freq;
|
||||
|
||||
//only the ones lisening to more than 1 frame at a time needs to change fifo structure
|
||||
switch (myDetectorType) {
|
||||
case GOTTHARD:
|
||||
case PROPIX:
|
||||
if(SetupFifoStructure() == FAIL)
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
FILE_LOG(logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency;
|
||||
FILE_LOG (logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setDataStreamEnable(const bool enable){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (dataStreamEnable != enable) {
|
||||
dataStreamEnable = enable;
|
||||
|
||||
//data sockets have to be created again as the client ones are
|
||||
if(dataStreamer.size())
|
||||
if (dataStreamer.size())
|
||||
dataStreamer.clear();
|
||||
|
||||
if(enable){
|
||||
if (enable) {
|
||||
for ( int i=0; i < numThreads; ++i ) {
|
||||
dataStreamer.push_back(new DataStreamer());
|
||||
if (DataStreamer::GetErrorMask()) {
|
||||
@ -240,13 +190,13 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable){
|
||||
}
|
||||
}
|
||||
}
|
||||
FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
|
||||
FILE_LOG (logINFO) << "Data Send to Gui: " << dataStreamEnable;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (acquisitionPeriod != i) {
|
||||
acquisitionPeriod = i;
|
||||
@ -255,27 +205,322 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){
|
||||
switch (myDetectorType) {
|
||||
case GOTTHARD:
|
||||
case PROPIX:
|
||||
if(setupFifoStructure() == FAIL)
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
break;
|
||||
case EIGER:
|
||||
if (fileFormatType == BINARY)
|
||||
for (int i=0; i<numThreads; ++i )
|
||||
updateFileHeader(i); /*????????*/
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s";
|
||||
FILE_LOG (logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s";
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (acquisitionTime != i) {
|
||||
acquisitionTime = i;
|
||||
|
||||
//only the ones lisening to more than 1 frame at a time needs to change fifo structure
|
||||
switch (myDetectorType) {
|
||||
case GOTTHARD:
|
||||
case PROPIX:
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
FILE_LOG (logINFO) << "Acquisition Period: " << (double)acquisitionTime/(1E9) << "s";
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (numberOfFrames != i) {
|
||||
numberOfFrames = i;
|
||||
|
||||
//only the ones lisening to more than 1 frame at a time needs to change fifo structure
|
||||
switch (myDetectorType) {
|
||||
case GOTTHARD:
|
||||
case PROPIX:
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
FILE_LOG (logINFO) << "Number of Frames:" << numberOfFrames;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (dynamicRange != i) {
|
||||
dynamicRange = i;
|
||||
//side effects
|
||||
generalData->SetDynamicRange(i,tengigaEnable);
|
||||
|
||||
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
}
|
||||
FILE_LOG (logINFO) << "Dynamic Range: " << dynamicRange;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setTenGigaEnable(const bool b) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (tengigaEnable != b) {
|
||||
tengigaEnable = b;
|
||||
//side effects
|
||||
generalData->SetTenGigaEnable(tengigaEnable,b);
|
||||
|
||||
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
}
|
||||
FILE_LOG (logINFO) << "Ten Giga: " << stringEnable(tengigaEnable);
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
if (fifoDepth != i) {
|
||||
fifoDepth = i;
|
||||
|
||||
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
|
||||
if (SetupFifoStructure() == FAIL)
|
||||
return FAIL;
|
||||
}
|
||||
FILE_LOG (logINFO) << "Fifo Depth: " << i << endl;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int UDPStandardImplementation::setDetectorType(const detectorType d) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
|
||||
void UDPStandardImplementation::SetLocalNetworkParameters(){
|
||||
FILE_LOG (logDEBUG) << "Setting receiver type";
|
||||
|
||||
DeleteMembers();
|
||||
InitializeMembers();
|
||||
myDetectorType = d;
|
||||
switch(myDetectorType) {
|
||||
case GOTTHARD:
|
||||
case PROPIX:
|
||||
case MOENCH:
|
||||
case EIGER:
|
||||
case JUNGFRAUCTB:
|
||||
case JUNGFRAU:
|
||||
FILE_LOG (logINFO) << " ***** " << getDetectorType(d) << " Receiver *****";
|
||||
break;
|
||||
default:
|
||||
FILE_LOG (logERROR) << "This is an unknown receiver type " << (int)d;
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
|
||||
//set detector specific variables
|
||||
switch(myDetectorType) {
|
||||
case GOTTHARD: generalData = new GotthardData(); break;
|
||||
case PROPIX: generalData = new PropixData(); break;
|
||||
case MOENCH: generalData = new Moench02Data(); break;
|
||||
case EIGER: generalData = new EigerData(); break;
|
||||
case JUNGFRAUCTB: generalData = new JCTBData(); break;
|
||||
case JUNGFRAU: generalData = new JungfrauData(); break;
|
||||
default: break;
|
||||
}
|
||||
numThreads = generalData->threadsPerReceiver;
|
||||
|
||||
|
||||
for ( int i=0; i < numThreads; ++i ) {
|
||||
|
||||
//create fifo structure
|
||||
numberofJobs = -1;
|
||||
if (SetupFifoStructure() == FAIL) {
|
||||
FILE_LOG (logERROR) << "Error: Could not allocate memory for fifo (index:" << i << ")";
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
//create threads
|
||||
listener.push_back(new Listener(fifo[i]));
|
||||
dataProcessor.push_back(new DataProcessor(fifo[i]));
|
||||
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
|
||||
FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")";
|
||||
return FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
//local network parameters
|
||||
SetLocalNetworkParameters();
|
||||
|
||||
FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d);
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void UDPStandardImplementation::resetAcquisitionCount() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " starting";
|
||||
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
|
||||
(*it)->ResetParametersforNewAcquisition();
|
||||
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
|
||||
(*it)->ResetParametersforNewAcquisition();
|
||||
|
||||
FILE_LOG (logINFO) << "Acquisition Count has been reset";
|
||||
}
|
||||
|
||||
|
||||
|
||||
int UDPStandardImplementation::startReceiver(char *c) {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
ResetParametersforNewMeasurement();
|
||||
|
||||
if (CreateUDPSockets() == FAIL) {
|
||||
strcpy(c,"Could not create UDP Socket(s).");
|
||||
FILE_LOG(logERROR) << c;
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
if(fileWriteEnable){
|
||||
if (SetupWriter() == FAIL) {
|
||||
strcpy(c,"Could not create file.");
|
||||
FILE_LOG(logERROR) << c;
|
||||
return FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
//Let Threads continue to be ready for acquisition
|
||||
StartRunning();
|
||||
|
||||
FILE_LOG(logINFO) << "Receiver Started";
|
||||
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void UDPStandardImplementation::stopReceiver(){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
FILE_LOG(logINFO) << "Stopping Receiver";
|
||||
|
||||
//set status to transmitting
|
||||
startReadout();
|
||||
|
||||
//wait until status is run_finished
|
||||
while(status == TRANSMITTING){
|
||||
usleep(5000);
|
||||
}
|
||||
|
||||
/* //change status
|
||||
pthread_mutex_lock(&statusMutex);
|
||||
status = IDLE;
|
||||
pthread_mutex_unlock(&(statusMutex));
|
||||
*/
|
||||
FILE_LOG(logINFO) << "Receiver Stopped";
|
||||
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
|
||||
cout << endl << endl;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void UDPStandardImplementation::startReadout(){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
|
||||
if(status == RUNNING){
|
||||
|
||||
//needs to wait for packets only if activated
|
||||
if(activated){
|
||||
|
||||
//current packets caught
|
||||
int totalP = 0,prev=-1;
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
|
||||
totalP += (*it)->GetTotalPacketsCaught();
|
||||
|
||||
//current udp buffer received
|
||||
int currentReceivedInBuffer=0,prevReceivedInBuffer=-1;
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
|
||||
currentReceivedInBuffer += (*it)->GetNumReceivedinUDPBuffer();
|
||||
|
||||
//wait for all packets
|
||||
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
|
||||
|
||||
//wait as long as there is change from prev totalP,
|
||||
//and also change from received in buffer to previous value
|
||||
//(as one listens to many at a time, shouldnt cut off in between)
|
||||
while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){
|
||||
#ifdef VERY_VERBOSE
|
||||
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d PrevBuffer:%d currentBuffer:%d\n",prev,totalP,prevReceivedInBuffer,currentReceivedInBuffer);
|
||||
|
||||
#endif
|
||||
//usleep(2*1000*1000);
|
||||
usleep(5*1000);/* Need to find optimal time **/
|
||||
|
||||
prev = totalP;
|
||||
totalP = 0;
|
||||
prevReceivedInBuffer = currentReceivedInBuffer;
|
||||
currentReceivedInBuffer = 0;
|
||||
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
|
||||
totalP += (*it)->GetTotalPacketsCaught();
|
||||
currentReceivedInBuffer += (*it)->GetNumReceivedinUDPBuffer();
|
||||
}
|
||||
#ifdef VERY_VERBOSE
|
||||
cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
|
||||
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*//set status
|
||||
pthread_mutex_lock(&statusMutex);
|
||||
status = TRANSMITTING;
|
||||
pthread_mutex_unlock(&statusMutex);*/
|
||||
|
||||
FILE_LOG(logINFO) << "Status: Transmitting";
|
||||
}
|
||||
|
||||
//shut down udp sockets so as to make listeners push dummy (end) packets for processors
|
||||
shutDownUDPSockets();
|
||||
}
|
||||
|
||||
|
||||
void UDPStandardImplementation::shutDownUDPSockets() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
|
||||
(*it)->ShutDownUDPSocket();
|
||||
}
|
||||
|
||||
|
||||
|
||||
void UDPStandardImplementation::closeFiles() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
|
||||
(*it)->CloseFile();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
void UDPStandardImplementation::SetLocalNetworkParameters() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
|
||||
if (myDetectorType == EIGER)
|
||||
@ -285,30 +530,30 @@ void UDPStandardImplementation::SetLocalNetworkParameters(){
|
||||
|
||||
//to increase Socket Receiver Buffer size
|
||||
sprintf(command,"echo $((%d)) > /proc/sys/net/core/rmem_max",RECEIVE_SOCKET_BUFFER_SIZE);
|
||||
if (system(command)){
|
||||
FILE_LOG(logWARNING) << "No root permission to change Socket Receiver Buffer size (/proc/sys/net/core/rmem_max)";
|
||||
if (system(command)) {
|
||||
FILE_LOG (logWARNING) << "No root permission to change Socket Receiver Buffer size (/proc/sys/net/core/rmem_max)";
|
||||
return;
|
||||
}
|
||||
FILE_LOG(logINFO) << "Socket Receiver Buffer size (/proc/sys/net/core/rmem_max) modified to " << RECEIVE_SOCKET_BUFFER_SIZE ;
|
||||
FILE_LOG (logINFO) << "Socket Receiver Buffer size (/proc/sys/net/core/rmem_max) modified to " << RECEIVE_SOCKET_BUFFER_SIZE ;
|
||||
|
||||
|
||||
// to increase Max length of input packet queue
|
||||
sprintf(command,"echo %d > /proc/sys/net/core/netdev_max_backlog",MAX_SOCKET_INPUT_PACKET_QUEUE);
|
||||
if (system(command)){
|
||||
FILE_LOG(logWARNING) << "No root permission to change Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog)";
|
||||
if (system(command)) {
|
||||
FILE_LOG (logWARNING) << "No root permission to change Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog)";
|
||||
return;
|
||||
}
|
||||
FILE_LOG(logINFO) << "Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog) modified to " << MAX_SOCKET_INPUT_PACKET_QUEUE ;
|
||||
FILE_LOG (logINFO) << "Max length of input packet queue (/proc/sys/net/core/netdev_max_backlog) modified to " << MAX_SOCKET_INPUT_PACKET_QUEUE ;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int UDPStandardImplementation::SetupFifoStructure(){
|
||||
FILE_LOG(logDEBUG) << __AT__ << " called";
|
||||
int UDPStandardImplementation::SetupFifoStructure() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
|
||||
//recalculate number of jobs & fifodepth, return if no change
|
||||
if ((myDetectortype == GOTTHARD) || (myDetectortype = PROPIX)) {
|
||||
if ((myDetectorType == GOTTHARD) || (myDetectorType = PROPIX)) {
|
||||
|
||||
int oldnumberofjobs = numberofJobs;
|
||||
//listen to only n jobs at a time
|
||||
@ -316,38 +561,40 @@ int UDPStandardImplementation::SetupFifoStructure(){
|
||||
numberofJobs = frameToGuiFrequency;
|
||||
else {
|
||||
//random freq depends on acquisition period/time (calculate upto 100ms/period)
|
||||
i = ((acquisitionPeriod > 0) ?
|
||||
int i = ((acquisitionPeriod > 0) ?
|
||||
(SAMPLE_TIME_IN_NS/acquisitionPeriod):
|
||||
((acquisitionTime > 0) ? (SAMPLE_TIME_IN_NS/acquisitionTime) : SAMPLE_TIME_IN_NS));
|
||||
//must be > 0 and < max jobs
|
||||
numberofJobs = ((i < 1) ? 1 : ((i > MAX_JOBS_PER_THREAD) ? MAX_JOBS_PER_THREAD : i));
|
||||
}
|
||||
FILE_LOG(logINFO) << "Number of Jobs Per Thread:" << numberofJobs << endl;
|
||||
|
||||
FILE_LOG (logINFO) << "Number of Jobs Per Thread:" << numberofJobs << endl;
|
||||
|
||||
uint32_t oldfifodepth = fifoDepth;
|
||||
//reduce fifo depth if numberofJobsPerBuffer > 1 (to save memory)
|
||||
if(numberofJobsPerBuffer >1){
|
||||
fifoDepth = ((fifoDepth % numberofJobsPerBuffer) ?
|
||||
((fifoDepth/numberofJobsPerBuffer)+1) : //if not directly divisible
|
||||
(fifoDepth/numberofJobsPerBuffer));
|
||||
if (numberofJobs >1) {
|
||||
fifoDepth = ((fifoDepth % numberofJobs) ?
|
||||
((fifoDepth/numberofJobs)+1) : //if not directly divisible
|
||||
(fifoDepth/numberofJobs));
|
||||
}
|
||||
FILE_LOG(logINFO) << "Total Fifo Size:" << fifoSize;
|
||||
FILE_LOG (logINFO) << "Total Fifo Depth Recalculated:" << fifoDepth;
|
||||
|
||||
//no change, return
|
||||
if ((oldnumberofjobs == numberofJobs) && (oldfifodepth == fifoDepth))
|
||||
return OK;
|
||||
}
|
||||
}else
|
||||
numberofJobs = 1;
|
||||
|
||||
|
||||
//delete fifostructure
|
||||
//create fifostructure
|
||||
fifo.clear();
|
||||
for ( int i=0; i < numThreads; i++ ) {
|
||||
|
||||
//create fifo structure
|
||||
bool success = true;
|
||||
fifo.push_back( new Fifo ((generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), success));
|
||||
if (!success){
|
||||
fifo.push_back( new Fifo (
|
||||
(generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
|
||||
fifoDepth, success));
|
||||
if (!success) {
|
||||
cprintf(BG_RED,"Error: Could not allocate memory for listening \n");
|
||||
return FAIL;
|
||||
}
|
||||
@ -356,7 +603,82 @@ int UDPStandardImplementation::SetupFifoStructure(){
|
||||
listener[i]->SetFifo(fifo[i]);
|
||||
dataProcessor[i]->SetFifo(fifo[i]);
|
||||
}
|
||||
FILE_LOG(logINFO) << "Fifo structure(s) reconstructed";
|
||||
FILE_LOG (logINFO) << "Fifo structure(s) reconstructed";
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
|
||||
(*it)->ResetParametersforNewMeasurement();
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
|
||||
(*it)->ResetParametersforNewMeasurement();
|
||||
}
|
||||
|
||||
|
||||
|
||||
int UDPStandardImplementation::CreateUDPSockets() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
shutDownUDPSockets();
|
||||
|
||||
//if eth is mistaken with ip address
|
||||
if (strchr(eth,'.') != NULL){
|
||||
strcpy(eth,"");
|
||||
}
|
||||
if(!strlen(eth)){
|
||||
FILE_LOG(logWARNING) << "eth is empty. Listening to all";
|
||||
}
|
||||
bool error = false;
|
||||
for (unsigned int i = 0; i < listener.size(); ++i)
|
||||
if (listener[i]->CreateUDPSockets(udpPortNum[i], generalData->packetSize,
|
||||
(strlen(eth)?eth:NULL), generalData->headerPacketSize) == FAIL) {
|
||||
error = true;
|
||||
break;
|
||||
}
|
||||
if (error) {
|
||||
shutDownUDPSockets();
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
FILE_LOG(logDEBUG) << "UDP socket(s) created successfully.";
|
||||
cout << "Listener Ready ..." << endl;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
int UDPStandardImplementation::SetupWriter() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
bool error = false;
|
||||
for (unsigned int i = 0; i < dataProcessor.size(); ++i)
|
||||
if (dataProcessor[i]->CreateNewFile() == FAIL) {
|
||||
error = true;
|
||||
break;
|
||||
}
|
||||
if (error) {
|
||||
shutDownUDPSockets();
|
||||
closeFiles();
|
||||
return FAIL;
|
||||
}
|
||||
|
||||
cout << "Writer Ready ..." << endl;
|
||||
return OK;
|
||||
}
|
||||
|
||||
|
||||
void UDPStandardImplementation::StartRunning() {
|
||||
FILE_LOG (logDEBUG) << __AT__ << " called";
|
||||
|
||||
//set running mask and post semaphore to start the inner loop in execution thread
|
||||
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
|
||||
(*it)->StartRunning();
|
||||
(*it)->Continue();
|
||||
}
|
||||
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
|
||||
(*it)->StartRunning();
|
||||
(*it)->Continue();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user