trying to get in changes for activate in receiver

This commit is contained in:
Dhanya Maliakal
2016-10-05 08:24:35 +02:00
parent f6b7fd7aa3
commit 489b623afd
8 changed files with 212 additions and 28 deletions

View File

@@ -212,6 +212,12 @@ void UDPStandardImplementation::initializeMembers(){
createFileMask = 0x0;
killAllWritingThreads = false;
//***deactivated parameters***
for(int i=0; i < MAX_NUMBER_OF_LISTENING_THREADS; i++)
deactivatedFrameNumber[i] = 0;
deactivatedFrameIncrement = 0;
//***filter parameters***
commonModeSubtractionEnable = false;
moenchCommonModeSubtraction = NULL;
@@ -957,6 +963,11 @@ int UDPStandardImplementation::startReceiver(char *c){
fileCreateSuccess = false;
pthread_mutex_unlock(&statusMutex);
//deactivated parameters
for(int i = 0; i < numberofListeningThreads; ++i)
deactivatedFrameNumber[i] = 0;
deactivatedFrameIncrement = (bufferSize/(onePacketSize*packetsPerFrame))*numberofJobsPerBuffer;
FILE_LOG(logINFO) << "Deactivated Frame Increment:" << deactivatedFrameIncrement;
//Print Receiver Configuration
@@ -1095,44 +1106,48 @@ void UDPStandardImplementation::startReadout(){
if(status == RUNNING){
//check if all packets got
int totalP = 0,prev=-1,i;
for(i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
//check if current buffer still receiving something
int currentReceivedInBuffer=0,prevReceivedInBuffer=-1;
for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
//needs to wait for packets only if activated
if(activated){
//check if all packets got
int totalP = 0,prev=-1,i;
for(i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
//wait for all packets
if(totalP!=numberOfFrames*packetsPerFrame*numberofListeningThreads){
//check if current buffer still receiving something
int currentReceivedInBuffer=0,prevReceivedInBuffer=-1;
for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
//wait as long as there is change from prev totalP,
//and also change from received in buffer to previous value
//(as one listens to many at a time, shouldnt cut off in between)
while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){
//wait for all packets
if(totalP!=numberOfFrames*packetsPerFrame*numberofListeningThreads){
//wait as long as there is change from prev totalP,
//and also change from received in buffer to previous value
//(as one listens to many at a time, shouldnt cut off in between)
while((prev != totalP) || (prevReceivedInBuffer!= currentReceivedInBuffer)){
#ifdef DEBUG5
cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
cprintf(MAGENTA,"waiting for all packets totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
#endif
usleep(5000);/* Need to find optimal time (exposure time and acquisition period) **/
prev = totalP;
totalP = 0;
for(i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
usleep(5000);/* Need to find optimal time (exposure time and acquisition period) **/
prev = totalP;
totalP = 0;
for(i=0; i<numberofListeningThreads; ++i)
totalP += totalListeningPacketCount[i];
prevReceivedInBuffer = currentReceivedInBuffer;
currentReceivedInBuffer = 0;
for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
prevReceivedInBuffer = currentReceivedInBuffer;
currentReceivedInBuffer = 0;
for(i=0; i<numberofListeningThreads; ++i)
currentReceivedInBuffer += udpSocket[i]->getCurrentTotalReceived();
#ifdef DEBUG5
cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
cprintf(MAGENTA,"\tupdated: totalP:%d currently in buffer:%d\n",totalP,currentReceivedInBuffer);
#endif
}
}
}
//set status
@@ -1953,7 +1968,7 @@ void UDPStandardImplementation::startListening(){
//udpsocket doesnt exist
if(status == TRANSMITTING){
if ((status == TRANSMITTING)||(rc == 0 && activated == 0)){
FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier";
stopListening(ithread,0);
continue;
@@ -2023,6 +2038,50 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
//carry over from previous buffer
if(cSize) memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
if(!activated){
//cSize = 0 for deactivated
int framestoclone = 0;
//done
if(deactivatedFrameNumber[ithread] == numberOfFrames)
return 0;
//last
if((deactivatedFrameNumber[ithread] + deactivatedFrameIncrement) > numberOfFrames)
framestoclone = numberOfFrames - deactivatedFrameNumber[ithread];
//in progress
else
framestoclone = deactivatedFrameIncrement;
//copy dummy packets
memset(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, 0xFF,framestoclone*packetsPerFrame*onePacketSize);
//set fnum, pnum and deactivatedpacket label
eiger_packet_header_t* header;
eiger_packet_footer_t* footer;
int pnum=0;
//loop by each packet
for(int offset=HEADER_SIZE_NUM_TOT_PACKETS;
offset<framestoclone*packetsPerFrame*onePacketSize;
offset+=onePacketSize){
header = (eiger_packet_header_t*)(buffer[ithread] + offset);
footer = (eiger_packet_footer_t*)(buffer[ithread] + offset + footerOffset);
*( (uint64_t*) footer) = ++deactivatedFrameNumber[ithread];
*( (uint16_t*) footer->packetNumber) = ++pnum;
*( (uint16_t*) header->missingPacket) = deactivatedPacketValue;
#ifdef MANUALDEBUG
cprintf(GREEN,"thread:%d pnum:%d fnum:%d\n",
ithread,
(*( (uint16_t*) footer->packetNumber)),
(uint32_t)(*( (uint64_t*) footer)));
#endif
if(pnum == packetsPerFrame)
pnum = 0;
}
return framestoclone*onePacketSize;
}
if(status != TRANSMITTING)
receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, (bufferSize * numberofJobsPerBuffer) - cSize);
//eiger returns 0 when header packet caught
@@ -2604,6 +2663,8 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
}
}
if(!activated)
cprintf(RED,"Note: Deactivated Receiver\n");
//acquisition end
if (acquisitionFinishedCallBack)