updated to have activate function implemented

This commit is contained in:
Dhanya Maliakal
2016-10-05 09:30:08 +02:00
parent 489b623afd
commit bf54c15560
2 changed files with 30 additions and 19 deletions

View File

@ -97,6 +97,10 @@ void UDPStandardImplementation::deleteMembers(){
createWriterThreads(true); createWriterThreads(true);
threadStarted = false; threadStarted = false;
} }
if(zmqThreadStarted){
createDataCallbackThreads(true);
zmqThreadStarted = false;
}
} }
void UDPStandardImplementation::deleteFilter(){ void UDPStandardImplementation::deleteFilter(){
@ -452,7 +456,6 @@ void UDPStandardImplementation::setFileName(const char c[]){
detID = 0; detID = 0;
} }
if(dataStreamEnable && (strcmp(oldfilename,fileName))){ if(dataStreamEnable && (strcmp(oldfilename,fileName))){
if(zmqThreadStarted) if(zmqThreadStarted)
createDataCallbackThreads(true); createDataCallbackThreads(true);
@ -576,6 +579,8 @@ uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){
int olddatasend = dataStreamEnable; int olddatasend = dataStreamEnable;
dataStreamEnable = enable; dataStreamEnable = enable;
FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
//if there is a change //if there is a change
if(olddatasend != dataStreamEnable){ if(olddatasend != dataStreamEnable){
if(zmqThreadStarted) if(zmqThreadStarted)
@ -589,9 +594,6 @@ uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){
} }
} }
FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
return OK; return OK;
} }
@ -1968,7 +1970,7 @@ void UDPStandardImplementation::startListening(){
//udpsocket doesnt exist //udpsocket doesnt exist
if ((status == TRANSMITTING)||(rc == 0 && activated == 0)){ if(activated && udpSocket[ithread] == NULL){
FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier"; FILE_LOG(logERROR) << "Listening_Thread " << ithread << ": UDP Socket not created or shut down earlier";
stopListening(ithread,0); stopListening(ithread,0);
continue; continue;
@ -1981,7 +1983,7 @@ void UDPStandardImplementation::startListening(){
if((!measurementStarted) && (rc > 0)) if((!measurementStarted) && (rc > 0))
startFrameIndices(ithread); startFrameIndices(ithread);
//problem in receiving or end of acquisition //problem in receiving or end of acquisition
if (status == TRANSMITTING){ if (status == TRANSMITTING||(rc == 0 && activated == 0)){
stopListening(ithread,rc); stopListening(ithread,rc);
continue; continue;
} }
@ -2040,20 +2042,25 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
if(!activated){ if(!activated){
//cSize = 0 for deactivated //cSize = 0 for deactivated
int framestoclone = 0; int framestoclone = 0;
//first
if(deactivatedFrameNumber[ithread]==0)
deactivatedFrameNumber[ithread]++;
//done //done
if(deactivatedFrameNumber[ithread] == numberOfFrames) if(deactivatedFrameNumber[ithread] == (numberOfFrames+1))
return 0; return 0;
//last //last
if((deactivatedFrameNumber[ithread] + deactivatedFrameIncrement) > numberOfFrames) if((deactivatedFrameNumber[ithread] + deactivatedFrameIncrement) > (numberOfFrames+1))
framestoclone = numberOfFrames - deactivatedFrameNumber[ithread]; framestoclone = (numberOfFrames+1) - deactivatedFrameNumber[ithread];
//in progress //in progress
else else
framestoclone = deactivatedFrameIncrement; framestoclone = deactivatedFrameIncrement;
//copy dummy packets //copy dummy packets
memset(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, 0xFF,framestoclone*packetsPerFrame*onePacketSize); receivedSize = framestoclone*packetsPerFrame*onePacketSize;
memset(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, 0xFF,receivedSize);
//set fnum, pnum and deactivatedpacket label //set fnum, pnum and deactivatedpacket label
eiger_packet_header_t* header; eiger_packet_header_t* header;
@ -2061,24 +2068,28 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int cSize, ch
int pnum=0; int pnum=0;
//loop by each packet //loop by each packet
for(int offset=HEADER_SIZE_NUM_TOT_PACKETS; for(int offset=HEADER_SIZE_NUM_TOT_PACKETS;
offset<framestoclone*packetsPerFrame*onePacketSize; offset<receivedSize;
offset+=onePacketSize){ offset+=onePacketSize){
header = (eiger_packet_header_t*)(buffer[ithread] + offset); header = (eiger_packet_header_t*)(buffer[ithread] + offset);
footer = (eiger_packet_footer_t*)(buffer[ithread] + offset + footerOffset); footer = (eiger_packet_footer_t*)(buffer[ithread] + offset + footerOffset);
*( (uint64_t*) footer) = ++deactivatedFrameNumber[ithread]; *( (uint64_t*) footer) = deactivatedFrameNumber[ithread];
*( (uint16_t*) footer->packetNumber) = ++pnum; *( (uint16_t*) footer->packetNumber) = ++pnum;
*( (uint16_t*) header->missingPacket) = deactivatedPacketValue; *( (uint16_t*) header->missingPacket) = deactivatedPacketValue;
#ifdef MANUALDEBUG #ifdef MANUALDEBUG
cprintf(GREEN,"thread:%d pnum:%d fnum:%d\n", if(!ithread){
ithread, cprintf(GREEN,"thread:%d pnum:%d fnum:%d\n",
(*( (uint16_t*) footer->packetNumber)), ithread,
(uint32_t)(*( (uint64_t*) footer))); (*( (uint16_t*) footer->packetNumber)),
(uint32_t)(*( (uint64_t*) footer)));
}
#endif #endif
if(pnum == packetsPerFrame) if(pnum == packetsPerFrame){
pnum = 0; pnum = 0;
deactivatedFrameNumber[ithread]++;
}
} }
return framestoclone*onePacketSize; return receivedSize;
} }

View File

@ -877,7 +877,7 @@ int slsReceiverTCPIPInterface::stop_receiver(){
ret=FAIL; ret=FAIL;
} }
else{ else{
if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){ if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING || receiverBase->getStatus()==RUN_FINISHED){
receiverBase->stopReceiver(); receiverBase->stopReceiver();
} }
s = receiverBase->getStatus(); s = receiverBase->getStatus();