works for deactivated server and receiver

This commit is contained in:
Dhanya Maliakal 2016-10-04 07:36:00 +02:00
parent 572047b72d
commit 2a4bd8022e
8 changed files with 195 additions and 20 deletions

View File

@ -193,6 +193,14 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
*/ */
runStatus getStatus() const; runStatus getStatus() const;
/**
* Get activate
* If deactivated, receiver will write dummy packets 0xFF
* (as it will receive nothing from detector)
* @return 0 for deactivated, 1 for activated
*/
int getActivate() const;
@ -417,6 +425,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
*/ */
void closeFile(int i = -1); void closeFile(int i = -1);
/**
* Activate / Deactivate Receiver
* If deactivated, receiver will write dummy packets 0xFF
* (as it will receive nothing from detector)
*/
int setActivate(int enable = -1);
//***callback functions*** //***callback functions***
/** /**
@ -485,6 +500,8 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
const static int MAX_NUMBER_OF_LISTENING_THREADS = 2; const static int MAX_NUMBER_OF_LISTENING_THREADS = 2;
/** Receiver Status */ /** Receiver Status */
runStatus status; runStatus status;
/** Activated/Deactivated */
int activated;
//***connection parameters*** //***connection parameters***
/** Ethernet Interface */ /** Ethernet Interface */
@ -528,6 +545,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
//***callback parameters*** //***callback parameters***
/** /**
* function being called back for start acquisition * function being called back for start acquisition

View File

@ -252,6 +252,14 @@ class UDPInterface {
*/ */
virtual slsReceiverDefs::runStatus getStatus() const = 0; virtual slsReceiverDefs::runStatus getStatus() const = 0;
/**
* Get activate
* If deactivated, receiver will write dummy packets 0xFF
* (as it will receive nothing from detector)
* @return 0 for deactivated, 1 for activated
*/
virtual int getActivate() const = 0;
@ -474,6 +482,13 @@ class UDPInterface {
*/ */
virtual void closeFile(int i = -1) = 0; virtual void closeFile(int i = -1) = 0;
/**
* Activate / Deactivate Receiver
* If deactivated, receiver will write dummy packets 0xFF
* (as it will receive nothing from detector)
*/
virtual int setActivate(int enable = -1) = 0;
//***callback functions*** //***callback functions***
/** /**

View File

@ -597,6 +597,7 @@ private:
/** Missing Packet identifier value */ /** Missing Packet identifier value */
const static uint16_t missingPacketValue = 0xFFFF; const static uint16_t missingPacketValue = 0xFFFF;
const static uint16_t deactivatedPacketValue = 0xFEFE;
/** Dummy Packet identifier value */ /** Dummy Packet identifier value */
const static uint32_t dummyPacketValue = 0xFFFFFFFF; const static uint32_t dummyPacketValue = 0xFFFFFFFF;
@ -675,6 +676,9 @@ private:
bool killAllWritingThreads; bool killAllWritingThreads;
//***deactivated parameters***
uint64_t deactivated_framenumber[MAX_NUMBER_OF_LISTENING_THREADS];
uint32_t deactivated_packetnumber[MAX_NUMBER_OF_LISTENING_THREADS];

View File

@ -207,6 +207,10 @@ private:
/** set fifo depth */ /** set fifo depth */
int set_fifo_depth(); int set_fifo_depth();
/** activate/ deactivate */
int set_activate();
//General Functions //General Functions
/** Locks Receiver */ /** Locks Receiver */
int lock_receiver(); int lock_receiver();

View File

@ -49,7 +49,9 @@ enum {
F_ENABLE_RECEIVER_OVERWRITE, /**< set overwrite flag in receiver */ F_ENABLE_RECEIVER_OVERWRITE, /**< set overwrite flag in receiver */
F_ENABLE_RECEIVER_TEN_GIGA, /**< enable 10Gbe in receiver */ F_ENABLE_RECEIVER_TEN_GIGA, /**< enable 10Gbe in receiver */
F_SET_RECEIVER_FIFO_DEPTH /**< set receiver fifo depth */ F_SET_RECEIVER_FIFO_DEPTH, /**< set receiver fifo depth */
F_ACTIVATE /** < activate/deactivate readout */
/* Always append functions hereafter!!! */ /* Always append functions hereafter!!! */
}; };

View File

@ -49,6 +49,7 @@ void UDPBaseImplementation::initializeMembers(){
//***receiver parameters*** //***receiver parameters***
status = IDLE; status = IDLE;
activated = true;
//***connection parameters*** //***connection parameters***
strcpy(eth,""); strcpy(eth,"");
@ -187,7 +188,7 @@ uint32_t UDPBaseImplementation::getFifoDepth() const{ FILE_LOG(logDEBUG) << __AT
/***receiver status***/ /***receiver status***/
slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return status;} slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return status;}
int UDPBaseImplementation::getActivate() const{FILE_LOG(logDEBUG) << __AT__ << " starting"; return activated;}
/************************************************************************* /*************************************************************************
@ -448,6 +449,17 @@ void UDPBaseImplementation::closeFile(int i){
} }
int UDPBaseImplementation::setActivate(int enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(enable != -1){
activated = enable;
FILE_LOG(logINFO) << "Activation: " << stringEnable(activated);
}
return activated;
}
/***callback functions***/ /***callback functions***/
void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){ void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){
startAcquisitionCallBack=func; startAcquisitionCallBack=func;

View File

@ -191,6 +191,12 @@ void UDPStandardImplementation::initializeMembers(){
createFileMask = 0x0; createFileMask = 0x0;
killAllWritingThreads = false; killAllWritingThreads = false;
//***deactivated parameters***
for(int i=0; i < MAX_NUMBER_OF_LISTENING_THREADS; i++){
deactivated_framenumber[i] = 0;
deactivated_packetnumber[i] = 0;
}
//***filter parameters*** //***filter parameters***
commonModeSubtractionEnable = false; commonModeSubtractionEnable = false;
moenchCommonModeSubtraction = NULL; moenchCommonModeSubtraction = NULL;
@ -837,6 +843,11 @@ int UDPStandardImplementation::startReceiver(char *c){
fileCreateSuccess = false; fileCreateSuccess = false;
pthread_mutex_unlock(&statusMutex); pthread_mutex_unlock(&statusMutex);
//deactivated parameters
for(int i = 0; i < numberofListeningThreads; ++i){
deactivated_framenumber[i] = 0;
deactivated_packetnumber[i] = 0;
}
//Print Receiver Configuration //Print Receiver Configuration
if(myDetectorType != EIGER){ if(myDetectorType != EIGER){
@ -959,6 +970,8 @@ void UDPStandardImplementation::startReadout(){
if(status == RUNNING){ if(status == RUNNING){
//needs to wait for packets only if activated
if(activated){
//check if all packets got //check if all packets got
int totalP = 0,prev,i; int totalP = 0,prev,i;
for(i=0; i<numberofListeningThreads; ++i){ for(i=0; i<numberofListeningThreads; ++i){
@ -974,7 +987,7 @@ void UDPStandardImplementation::startReadout(){
cprintf(MAGENTA,"waiting for all packets totalP:%d\n",totalP); cprintf(MAGENTA,"waiting for all packets totalP:%d\n",totalP);
#endif #endif
usleep(5000);/* Need to find optimal time (exposure time and acquisition period) **/ usleep(50000);/* Need to find optimal time (exposure time and acquisition period) **/
prev = totalP; prev = totalP;
totalP=0; totalP=0;
for(i=0; i<numberofListeningThreads; ++i){ for(i=0; i<numberofListeningThreads; ++i){
@ -982,7 +995,7 @@ void UDPStandardImplementation::startReadout(){
} }
} }
} }
}
//set status //set status
@ -1514,6 +1527,9 @@ void UDPStandardImplementation::startListening(){
tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame tempBuffer = new char[onePacketSize * (packetsPerFrame - 1)]; //store maximum of 1 packets less in a frame
} }
if(!activated)
usleep(1* 1000 * 1000);
/* inner loop - loop for each buffer */ /* inner loop - loop for each buffer */
//until mask unset (udp sockets shut down by client) //until mask unset (udp sockets shut down by client)
while((1 << ithread) & listeningThreadsMask){ while((1 << ithread) & listeningThreadsMask){
@ -1552,7 +1568,7 @@ void UDPStandardImplementation::startListening(){
//problem in receiving or end of acquisition //problem in receiving or end of acquisition
if (status == TRANSMITTING){ if ((status == TRANSMITTING)||(rc == 0 && activated == 0)){
stopListening(ithread,rc); stopListening(ithread,rc);
continue; continue;
} }
@ -1607,6 +1623,39 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
if(cSize) if(cSize)
memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize); memcpy(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, temp, cSize);
if(!activated){
//first time
if(!deactivated_framenumber[ithread])
deactivated_framenumber[ithread]++;
//new frame
if(deactivated_packetnumber[ithread] == (packetsPerFrame/numberofListeningThreads)){
deactivated_packetnumber[ithread] = 0;
deactivated_framenumber[ithread]++;
//last time
if(deactivated_framenumber[ithread] == (numberOfFrames+1))
return 0;
}
deactivated_packetnumber[ithread]++;
//copy dummy packet
memset(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, 0xFF,onePacketSize);
eiger_packet_header_t* header = (eiger_packet_header_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize);
eiger_packet_footer_t* footer = (eiger_packet_footer_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize + footerOffset);
*( (uint16_t*) header->missingPacket) = deactivatedPacketValue;
*( (uint64_t*) footer) = deactivated_framenumber[ithread];
*( (uint16_t*) footer->packetNumber) = deactivated_packetnumber[ithread];
//*( (uint16_t*) footer->packetNumber) = ithread*(packetsPerFrame/numberofListeningThreads)+deactivated_packetnumber[ithread];
#ifdef MANUALDEBUG
eiger_packet_footer_t* efooter = (eiger_packet_footer_t*)(buffer[ithread] + footerOffset + HEADER_SIZE_NUM_TOT_PACKETS);
cprintf(GREEN,"thread:%d pnum:%d fnum:%d\n",
ithread,
(*( (uint16_t*) efooter->packetNumber)),
(uint32_t)(*( (uint64_t*) efooter)));
#endif
return onePacketSize;
}
int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize); int receivedSize = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + cSize, lSize + cSize);
//throw away packets that is not one packet size, need to check status if socket is shut down //throw away packets that is not one packet size, need to check status if socket is shut down
@ -1622,6 +1671,7 @@ int UDPStandardImplementation::prepareAndListenBuffer(int ithread, int lSize, in
} }
totalListeningFrameCount[ithread] += (receivedSize/onePacketSize); totalListeningFrameCount[ithread] += (receivedSize/onePacketSize);
#ifdef MANUALDEBUG #ifdef MANUALDEBUG
if(receivedSize>0){ if(receivedSize>0){
if(myDetectorType == JUNGFRAU){ if(myDetectorType == JUNGFRAU){
@ -2565,6 +2615,8 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
cprintf(GREEN, "Total Packets Caught:%lld\n", (long long int)totalPacketsCaught); cprintf(GREEN, "Total Packets Caught:%lld\n", (long long int)totalPacketsCaught);
cprintf(GREEN, "Total Frames Caught:%lld\n",(long long int)(totalPacketsCaught/packetsPerFrame)); cprintf(GREEN, "Total Frames Caught:%lld\n",(long long int)(totalPacketsCaught/packetsPerFrame));
} }
if(!activated)
cprintf(RED,"Note: Deactivated Receiver\n");
//acquisition end //acquisition end
if (acquisitionFinishedCallBack) if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((int)(totalPacketsCaught/packetsPerFrame), pAcquisitionFinished); acquisitionFinishedCallBack((int)(totalPacketsCaught/packetsPerFrame), pAcquisitionFinished);

View File

@ -262,6 +262,7 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_ENABLE_RECEIVER_TEN_GIGA] = &slsReceiverTCPIPInterface::enable_tengiga; flist[F_ENABLE_RECEIVER_TEN_GIGA] = &slsReceiverTCPIPInterface::enable_tengiga;
flist[F_SET_RECEIVER_FIFO_DEPTH] = &slsReceiverTCPIPInterface::set_fifo_depth; flist[F_SET_RECEIVER_FIFO_DEPTH] = &slsReceiverTCPIPInterface::set_fifo_depth;
flist[F_ACTIVATE] = &slsReceiverTCPIPInterface::set_activate;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
@ -2720,8 +2721,75 @@ int slsReceiverTCPIPInterface::set_fifo_depth() {
int slsReceiverTCPIPInterface::set_activate() {
ret=OK;
int retval=-1;
int enable;
strcpy(mess,"Could not activate/deactivate\n");
// receive arguments
if(socket->ReceiveDataOnly(&enable,sizeof(enable)) < 0 ){
strcpy(mess,"Error reading from socket\n");
cprintf(RED,"%s",mess);
ret = FAIL;
}
// execute action if the arguments correctly arrived
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (ret==OK) {
if (lockStatus==1 && socket->differentClients==1){
sprintf(mess,"Receiver locked by %s\n", socket->lastClientIP);
ret=FAIL;
}
if(ret!=FAIL){
if (receiverBase == NULL){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
cprintf(RED,"%s",mess);
ret=FAIL;
}else if(receiverBase->getStatus()==RUNNING){
strcpy(mess,"Cannot activate/deactivate while status is running\n");
cprintf(RED,"%s",mess);
ret=FAIL;
}else{
if(enable != -1)
receiverBase->setActivate(enable);
retval = receiverBase->getActivate();
if(enable >= 0 && retval != enable){
sprintf(mess,"Tried to set activate to %d, but returned %d\n",enable,retval);
ret = FAIL;
cprintf(RED,"%s",mess);
}
}
}
}
#endif
#ifdef VERYVERBOSE
if(ret!=FAIL)
cout << "Activate: " << retval << endl;
else
cout << mess << endl;
#endif
if(ret==OK && socket->differentClients){
FILE_LOG(logDEBUG) << "Force update";
ret=FORCE_UPDATE;
}
// send answer
socket->SendDataOnly(&ret,sizeof(ret));
if(ret==FAIL){
cprintf(RED,"%s\n",mess);
socket->SendDataOnly(mess,sizeof(mess));
}
socket->SendDataOnly(&retval,sizeof(retval));
//return ok/fail
return ret;
}