trying different machine zmq

This commit is contained in:
Dhanya Maliakal
2016-11-04 11:03:24 +01:00
parent d4733543ab
commit 11d3511460

View File

@ -84,12 +84,12 @@ void UDPStandardImplementation::deleteMembers(){
//filter //filter
deleteFilter(); deleteFilter();
for(int i=0; i<MAX_NUMBER_OF_LISTENING_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_LISTENING_THREADS; i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;} if(mem0[i]) {free(mem0[i]); mem0[i] = 0;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;} if(fifo[i]) {delete fifo[i]; fifo[i] = 0;}
if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;} if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = 0;}
} }
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
if(latestData[i]) {delete[] latestData[i]; latestData[i] = NULL;} if(latestData[i]) {delete[] latestData[i]; latestData[i] = 0;}
} }
//kill threads //kill threads
if(threadStarted){ if(threadStarted){
@ -103,15 +103,15 @@ void UDPStandardImplementation::deleteMembers(){
void UDPStandardImplementation::deleteFilter(){ void UDPStandardImplementation::deleteFilter(){
FILE_LOG(logDEBUG) << __AT__ << " starting"; FILE_LOG(logDEBUG) << __AT__ << " starting";
moenchCommonModeSubtraction = NULL; moenchCommonModeSubtraction = 0;
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
if(singlePhotonDetectorObject[i]){ if(singlePhotonDetectorObject[i]){
delete []singlePhotonDetectorObject[i]; delete []singlePhotonDetectorObject[i];
singlePhotonDetectorObject[i] = NULL; singlePhotonDetectorObject[i] = 0;
} }
if(receiverData[i]){ if(receiverData[i]){
delete []receiverData[i]; delete []receiverData[i];
receiverData[i] = NULL; receiverData[i] = 0;
} }
} }
} }
@ -142,14 +142,14 @@ void UDPStandardImplementation::initializeMembers(){
//***file parameters*** //***file parameters***
#ifdef MYROOT1 #ifdef MYROOT1
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
myTree[i] = (NULL); myTree[i] = (0);
myFile[i] = (NULL); myFile[i] = (0);
} }
#endif #endif
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
strcpy(completeFileName[i],""); strcpy(completeFileName[i],"");
strcpy(fileHeader[i],""); strcpy(fileHeader[i],"");
sfilefd[i] = NULL; sfilefd[i] = 0;
} }
maxFramesPerFile = 0; maxFramesPerFile = 0;
fileCreateSuccess = false; fileCreateSuccess = false;
@ -176,18 +176,18 @@ void UDPStandardImplementation::initializeMembers(){
//***receiver parameters*** //***receiver parameters***
for(int i=0; i < MAX_NUMBER_OF_LISTENING_THREADS; i++){ for(int i=0; i < MAX_NUMBER_OF_LISTENING_THREADS; i++){
buffer[i] = NULL; buffer[i] = 0;
mem0[i] = NULL; mem0[i] = 0;
fifo[i] = NULL; fifo[i] = 0;
fifoFree[i] = NULL; fifoFree[i] = 0;
udpSocket[i] = NULL; udpSocket[i] = 0;
} }
numberofJobsPerBuffer = -1; numberofJobsPerBuffer = -1;
fifoSize = 0; fifoSize = 0;
//***receiver to GUI parameters*** //***receiver to GUI parameters***
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
latestData[i] = NULL; latestData[i] = 0;
guiNumPackets[i] = 0; guiNumPackets[i] = 0;
strcpy(guiFileName[i],""); strcpy(guiFileName[i],"");
frametoGuiCounter[i] = 0; frametoGuiCounter[i] = 0;
@ -222,10 +222,10 @@ void UDPStandardImplementation::initializeMembers(){
//***filter parameters*** //***filter parameters***
commonModeSubtractionEnable = false; commonModeSubtractionEnable = false;
moenchCommonModeSubtraction = NULL; moenchCommonModeSubtraction = 0;
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
singlePhotonDetectorObject[i] = NULL; singlePhotonDetectorObject[i] = 0;
receiverData[i] = NULL; receiverData[i] = 0;
} }
@ -242,7 +242,7 @@ void UDPStandardImplementation::initializeFilter(){
int sign = 1, csize, i; int sign = 1, csize, i;
//common mode initialization //common mode initialization
moenchCommonModeSubtraction = NULL; moenchCommonModeSubtraction = 0;
if (commonModeSubtractionEnable){ if (commonModeSubtractionEnable){
if(myDetectorType == MOENCH) if(myDetectorType == MOENCH)
moenchCommonModeSubtraction=new moenchCommonMode(); moenchCommonModeSubtraction=new moenchCommonMode();
@ -349,7 +349,7 @@ int UDPStandardImplementation::setupFifoStructure(){
//cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i])); //cprintf(BLUE,"FifoFree[%d]: value:%d, pop 0x%x\n",i,fifoFree[i]->getSemValue(),(void*)(buffer[i]));
} }
delete fifoFree[i]; delete fifoFree[i];
fifoFree[i] = NULL; fifoFree[i] = 0;
} }
if(fifo[i]){ if(fifo[i]){
while(!fifo[i]->isEmpty()){ while(!fifo[i]->isEmpty()){
@ -357,11 +357,11 @@ int UDPStandardImplementation::setupFifoStructure(){
//cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",i,fifo[i]->getSemValue(),(void*)(buffer[i])); //cprintf(CYAN,"Fifo[%d]: value:%d, pop 0x%x\n",i,fifo[i]->getSemValue(),(void*)(buffer[i]));
} }
delete fifo[i]; delete fifo[i];
fifo[i] = NULL; fifo[i] = 0;
} }
if(mem0[i]){ if(!mem0[i]){
free(mem0[i]); free(mem0[i]);
mem0[i] = NULL; mem0[i] = 0;
} }
//creating //creating
@ -370,7 +370,7 @@ int UDPStandardImplementation::setupFifoStructure(){
//allocate memory //allocate memory
mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * fifoSize); mem0[i] = (char*)malloc((bufferSize * numberofJobsPerBuffer + fifoBufferHeaderSize) * fifoSize);
if (mem0[i] == NULL){ if (mem0[i]){
cprintf(BG_RED,"Error: Could not allocate memory for listening \n"); cprintf(BG_RED,"Error: Could not allocate memory for listening \n");
return FAIL; return FAIL;
} }
@ -633,7 +633,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
//gui buffer //gui buffer
for(int i=0;i<numberofWriterThreads;i++){ for(int i=0;i<numberofWriterThreads;i++){
if(latestData[i]){delete[] latestData[i]; latestData[i] = NULL;} if(latestData[i]){delete[] latestData[i]; latestData[i] = 0;}
latestData[i] = new char[bufferSize]; latestData[i] = new char[bufferSize];
} }
//restructure fifo //restructure fifo
@ -688,7 +688,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b){
//gui buffer //gui buffer
for(int i=0;i<numberofWriterThreads;i++){ for(int i=0;i<numberofWriterThreads;i++){
if(latestData[i]){delete[] latestData[i]; latestData[i] = NULL;} if(latestData[i]){delete[] latestData[i]; latestData[i] = 0;}
latestData[i] = new char[bufferSize]; latestData[i] = new char[bufferSize];
} }
@ -871,7 +871,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
//allocate for latest data (frame copy for gui), free variables //allocate for latest data (frame copy for gui), free variables
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){ for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
if(latestData[i]) {delete[] latestData[i]; latestData[i] = NULL;} if(latestData[i]) {delete[] latestData[i]; latestData[i] = 0;}
latestData[i] = new char[bufferSize]; latestData[i] = new char[bufferSize];
} }
@ -940,7 +940,7 @@ int UDPStandardImplementation::startReceiver(char *c){
totalWritingPacketCount[i] = 0; totalWritingPacketCount[i] = 0;
if(sfilefd[i]){ if(sfilefd[i]){
fclose(sfilefd[i]); fclose(sfilefd[i]);
sfilefd[i] = NULL; sfilefd[i] = 0;
} }
//reset gui variables //reset gui variables
frametoGuiCounter[i] = 0; frametoGuiCounter[i] = 0;
@ -1082,7 +1082,7 @@ int UDPStandardImplementation::shutDownUDPSockets(){
udpSocket[i]->ShutDownSocket(); udpSocket[i]->ShutDownSocket();
FILE_LOG(logINFO) << "Shut down UDP Socket " << i; FILE_LOG(logINFO) << "Shut down UDP Socket " << i;
delete udpSocket[i]; delete udpSocket[i];
udpSocket[i] = NULL; udpSocket[i] = 0;
} }
} }
return OK; return OK;
@ -1178,7 +1178,7 @@ void UDPStandardImplementation::closeFile(int ithread){
FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd)); FILE_LOG(logDEBUG4) << "Going to close file: " << fileno(sfilefd));
#endif #endif
fclose(sfilefd[ithread]); fclose(sfilefd[ithread]);
sfilefd[ithread] = NULL; sfilefd[ithread] = 0;
} }
} }
@ -1190,7 +1190,7 @@ void UDPStandardImplementation::closeFile(int ithread){
FILE_LOG(logDEBUG4) << "sfilefd: " << (int)sfilefd[i]; FILE_LOG(logDEBUG4) << "sfilefd: " << (int)sfilefd[i];
#endif #endif
fclose(sfilefd[0]); fclose(sfilefd[0]);
sfilefd[0] = NULL; sfilefd[0] = 0;
} }
#endif #endif
@ -1211,10 +1211,10 @@ void UDPStandardImplementation::closeFile(int ithread){
//close file //close file
if(myTree[ithread] && myFile[ithread]) if(myTree[ithread] && myFile[ithread])
myFile[ithread] = myTree[ithread]->GetCurrentFile(); myFile[ithread] = myTree[ithread]->GetCurrentFile();
if(myFile[ithread] != NULL) if(myFile[ithread] != 0)
myFile[ithread]->Close(); myFile[ithread]->Close();
myFile[ithread] = NULL; myFile[ithread] = 0;
myTree[ithread] = NULL; myTree[ithread] = 0;
pthread_mutex_unlock(&writeMutex); pthread_mutex_unlock(&writeMutex);
#endif #endif
@ -1554,17 +1554,19 @@ int UDPStandardImplementation::createNewFile(int ithread){
//close file pointers //close file pointers
if(sfilefd[ithread]){ if(sfilefd[ithread]){
fclose(sfilefd[ithread]); fclose(sfilefd[ithread]);
sfilefd[ithread] = NULL; sfilefd[ithread] = 0;
} }
//create file //create file
if(!overwriteEnable){ if(!overwriteEnable){
if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){ if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){
FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread]; FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread];
sfilefd[ithread] = 0;
return FAIL; return FAIL;
} }
}else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){ }else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){
FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread]; FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread];
sfilefd[ithread] = 0;
return FAIL; return FAIL;
} }
//setting file buffer size to 16mb //setting file buffer size to 16mb
@ -1961,7 +1963,7 @@ void UDPStandardImplementation::startListening(){
uint32_t rc; //size of buffer received in bytes uint32_t rc; //size of buffer received in bytes
//split frames for data compression //split frames for data compression
int carryonBufferSize; //from previous buffer to keep frames together in a buffer int carryonBufferSize; //from previous buffer to keep frames together in a buffer
char* tempBuffer = NULL; //temporary buffer to store split frames char* tempBuffer = 0; //temporary buffer to store split frames
/* outer loop - loops once for each acquisition */ /* outer loop - loops once for each acquisition */
@ -1971,7 +1973,7 @@ void UDPStandardImplementation::startListening(){
//compression variables reset before acquisition //compression variables reset before acquisition
carryonBufferSize = 0; carryonBufferSize = 0;
if(dataCompressionEnable){ if(dataCompressionEnable){
if(tempBuffer!=NULL){delete []tempBuffer;tempBuffer=NULL;} if(tempBuffer){delete []tempBuffer;tempBuffer=0;}
tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame
} }
@ -1996,7 +1998,7 @@ void UDPStandardImplementation::startListening(){
//udpsocket doesnt exist //udpsocket doesnt exist
if(activated && udpSocket[ithread] == NULL){ if(activated && !udpSocket[ithread]){
FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier"; FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier";
stopListening(ithread,0); stopListening(ithread,0);
continue; continue;
@ -2523,7 +2525,7 @@ void UDPStandardImplementation::startWriting(){
//variable definitions //variable definitions
char* wbuf; //buffer popped from FIFO char* wbuf; //buffer popped from FIFO
sfilefd[ithread] = NULL; //file pointer sfilefd[ithread] = 0; //file pointer
uint64_t nf; //for compression, number of frames uint64_t nf; //for compression, number of frames
int listenfifoIndex = ithread; int listenfifoIndex = ithread;