somewhere

This commit is contained in:
Dhanya Maliakal
2017-02-10 10:08:00 +01:00
parent c89f6e649c
commit b260d08225
18 changed files with 1076 additions and 356 deletions

View File

@ -62,6 +62,7 @@ void UDPStandardImplementation::InitializeMembers() {
//*** receiver parameters ***
numThreads = 1;
numberofJobs = 1;
callbackAction = DO_EVERYTHING;
//*** mutex ***
pthread_mutex_init(&statusMutex,NULL);
@ -76,33 +77,69 @@ void UDPStandardImplementation::InitializeMembers() {
uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
FILE_LOG (logDEBUG) << __AT__ << " starting";
uint64_t sum = 0;
uint32_t flagsum = 0;
vector<DataProcessor*>::const_iterator it;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetMeasurementStartedFlag() ? 1 : 0);
sum += (*it)->GetNumTotalFramesCaught();
}
//no data processed
if (flagsum != dataProcessor.size()) return 0;
return (sum/dataProcessor.size());
}
uint64_t UDPStandardImplementation::getFramesCaught() const {
FILE_LOG (logDEBUG) << __AT__ << " starting";
uint64_t sum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
uint32_t flagsum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetNumFramesCaught();
}
//no data processed
if (flagsum != dataProcessor.size()) return 0;
return (sum/dataProcessor.size());
}
int64_t UDPStandardImplementation::getAcquisitionIndex() const {
FILE_LOG (logDEBUG) << __AT__ << " starting";
//no data processed
if(!DataProcessor::GetAcquisitionStartedFlag())
return -1;
uint64_t sum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
uint32_t flagsum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetProcessedAcquisitionIndex();
}
//no data processed
if (flagsum != dataProcessor.size()) return -1;
return (sum/dataProcessor.size());
}
void UDPStandardImplementation::setFileFormat(const fileFormat f){
FILE_LOG(logDEBUG) << __AT__ << " starting";
//destroy file writer, set file format and create file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetFileFormat(f);
FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType);
}
void UDPStandardImplementation::setFileName(const char c[]) {
FILE_LOG (logDEBUG) << __AT__ << " starting";
@ -114,6 +151,8 @@ void UDPStandardImplementation::setFileName(const char c[]) {
if (uscore!=string::npos) {
if (sscanf(tempname.substr(uscore+1, tempname.size()-uscore-1).c_str(), "d%d", &detindex)) {
detID = detindex;
tempname=tempname.substr(0,uscore);
strcpy(fileName, tempname.c_str());
}
}
if (detindex == -1)
@ -146,6 +185,10 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
Listener::SetGeneralData(generalData);
DataProcessor::SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->SetMaxFramesPerFile();
}
}
FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable;
return OK;
@ -324,7 +367,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
FILE_LOG (logDEBUG) << "Setting receiver type";
DeleteMembers();cout<<"size of fifo:"<<fifo.size()<<endl;
DeleteMembers();
InitializeMembers();
myDetectorType = d;
switch(myDetectorType) {
@ -357,6 +400,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
numThreads = generalData->threadsPerReceiver;
fifoDepth = generalData->defaultFifoDepth;
//local network parameters
SetLocalNetworkParameters();
//create fifo structure
numberofJobs = -1;
if (SetupFifoStructure() == FAIL) {
@ -366,8 +412,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
//create threads
for ( int i=0; i < numThreads; ++i ) {
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i]));
dataProcessor.push_back(new DataProcessor(fifo[i], &status, &statusMutex));
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth));
dataProcessor.push_back(new DataProcessor(fifo[i], &status, &statusMutex, &fileFormatType, &fileWriteEnable,
&callbackAction, rawDataReadyCallBack,pRawDataReady));
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
@ -380,8 +427,11 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
}
//local network parameters
SetLocalNetworkParameters();
//set up writer and callbacks
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetupFileWriter(fileName, filePath, &fileIndex, &frameIndexEnable,
&overwriteEnable, &detID, &numThreads);
FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d);
return OK;
@ -413,6 +463,7 @@ int UDPStandardImplementation::startReceiver(char *c) {
FILE_LOG(logERROR) << c;
return FAIL;
}
cout << "Listener Ready ..." << endl;
if(fileWriteEnable){
if (SetupWriter() == FAIL) {
@ -421,6 +472,24 @@ int UDPStandardImplementation::startReceiver(char *c) {
return FAIL;
}
}
cout << "Processor Ready ..." << endl;
//callbacks
callbackAction = DO_EVERYTHING;
if (startAcquisitionCallBack) /** file path and file index not required?? or need to include detector index? do they need the datasize? its given for write data anyway */
callbackAction=startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition);
if (callbackAction < DO_EVERYTHING) {
FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back.";
if (rawDataReadyCallBack) {
FILE_LOG(logINFO) << "Data Write has been defined externally";
}
} else if(!fileWriteEnable) {
FILE_LOG(logINFO) << "Data will not be saved";
}
//change status
pthread_mutex_lock(&statusMutex);
@ -432,6 +501,7 @@ int UDPStandardImplementation::startReceiver(char *c) {
FILE_LOG(logINFO) << "Receiver Started";
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
return OK;
}
@ -443,10 +513,41 @@ void UDPStandardImplementation::stopReceiver(){
//set status to transmitting
startReadout();
//wait until status is run_finished
while(status == TRANSMITTING){
//wait for the processes to be done
while(Listener::GetRunningMask()){
usleep(5000);
}
while(DataProcessor::GetRunningMask()){
usleep(5000);
}
pthread_mutex_lock(&statusMutex);
status = RUN_FINISHED;
pthread_mutex_unlock(&(statusMutex));
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
{ //statistics
int tot = 0;
for (int i = 0; i < numThreads; i++) {
tot += dataProcessor[i]->GetNumFramesCaught();
if (dataProcessor[i]->GetNumFramesCaught() < numberOfFrames) {
cprintf(RED, "\nPort %d\n",udpPortNum[i]);
cprintf(RED, "Missing Packets \t: %lld\n",(long long int)numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught());
cprintf(RED, "Frames Caught \t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught());
cprintf(RED, "Last Frame Number Caught :%lld\n",(long long int)listener[i]->GetLastFrameIndexCaught());
}else{
cprintf(GREEN, "\nPort %d\n",udpPortNum[i]);
cprintf(GREEN, "Missing Packets \t: %lld\n",(long long int)numberOfFrames*generalData->packetsPerFrame-listener[i]->GetTotalPacketsCaught());
cprintf(GREEN, "Frames Caught \t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught());
cprintf(GREEN, "Last Frame Number Caught :%lld\n",(long long int)listener[i]->GetLastFrameIndexCaught());
}
}
//callback
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((int)(tot/numThreads), pAcquisitionFinished);
}
//change status
pthread_mutex_lock(&statusMutex);
@ -473,18 +574,11 @@ void UDPStandardImplementation::startReadout(){
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetTotalPacketsCaught();
//current udp buffer received
int currentReceivedInBuffer=0,prevReceivedInBuffer=-1;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
currentReceivedInBuffer += (*it)->GetNumReceivedinUDPBuffer();
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
//wait as long as there is change from prev totalP,
//and also change from received in buffer to previous value
//(as one listens to many at a time, shouldnt cut off in between)
while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){
while(prev != totalP){
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d PrevBuffer:%d currentBuffer:%d\n",prev,totalP,prevReceivedInBuffer,currentReceivedInBuffer);
@ -494,16 +588,11 @@ void UDPStandardImplementation::startReadout(){
prev = totalP;
totalP = 0;
prevReceivedInBuffer = currentReceivedInBuffer;
currentReceivedInBuffer = 0;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetTotalPacketsCaught();
currentReceivedInBuffer += (*it)->GetNumReceivedinUDPBuffer();
}
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
#endif
}
}
@ -644,18 +733,9 @@ void UDPStandardImplementation::ResetParametersforNewMeasurement() {
int UDPStandardImplementation::CreateUDPSockets() {
FILE_LOG (logDEBUG) << __AT__ << " called";
shutDownUDPSockets();
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
strcpy(eth,"");
}
if(!strlen(eth)){
FILE_LOG(logWARNING) << "eth is empty. Listening to all";
}
bool error = false;
for (unsigned int i = 0; i < listener.size(); ++i)
if (listener[i]->CreateUDPSockets((strlen(eth)?eth:NULL)) == FAIL) {
if (listener[i]->CreateUDPSockets() == FAIL) {
error = true;
break;
}
@ -665,7 +745,6 @@ int UDPStandardImplementation::CreateUDPSockets() {
}
FILE_LOG(logDEBUG) << "UDP socket(s) created successfully.";
cout << "Listener Ready ..." << endl;
return OK;
}