This commit is contained in:
Dhanya Maliakal 2016-09-16 12:49:39 +02:00
parent 58713a90aa
commit 9c8f663b8e
8 changed files with 198 additions and 48 deletions

View File

@ -154,6 +154,13 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
*/
uint32_t getFrameToGuiFrequency() const;
/**
* Get the data stream enable
* @return 1 to send via zmq, else 0
*/
uint32_t getDataStreamEnable() const;
/**
* Get Acquisition Period
* @return acquisition period
@ -298,10 +305,17 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/**
* Set the Frequency of Frames Sent to GUI
* @param i 0 for random frame requests, n for nth frame frequency
* @param freq 0 for random frame requests, n for nth frame frequency
* @return OK or FAIL
*/
int setFrameToGuiFrequency(const uint32_t i);
int setFrameToGuiFrequency(const uint32_t freq);
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @return OK or FAIL
*/
uint32_t setDataStreamEnable(const uint32_t enable);
/**
* Set Acquisition Period
@ -525,7 +539,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
/* Short Frame Enable or index of adc enabled, else -1 if all enabled (gotthard specific) TODO: move to setROI */
int shortFrameEnable;
/** Frequency of Frames sent to GUI */
uint32_t FrameToGuiFrequency;
uint32_t frameToGuiFrequency;
/** Data Stream Enable from Receiver */
int32_t dataStreamEnable;

View File

@ -214,6 +214,12 @@ class UDPInterface {
*/
virtual uint32_t getFrameToGuiFrequency() const = 0;
/**
* Get the data stream enable
* @return 1 to send via zmq, else 0
*/
virtual uint32_t getDataStreamEnable() const = 0;
/**
* Get Acquisition Period
* @return acquisition period
@ -355,10 +361,17 @@ class UDPInterface {
/**
* Set the Frequency of Frames Sent to GUI
* @param i 0 for random frame requests, n for nth frame frequency
* @param freq 0 for random frame requests, n for nth frame frequency
* @return OK or FAIL
*/
virtual int setFrameToGuiFrequency(const uint32_t i) = 0;
virtual int setFrameToGuiFrequency(const uint32_t freq) = 0;
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @return OK or FAIL
*/
virtual uint32_t setDataStreamEnable(const uint32_t enable) = 0;
/**
* Set Acquisition Period

View File

@ -107,12 +107,18 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
void setShortFrameEnable(const int i);
/**
* Overridden method
* Set the Frequency of Frames Sent to GUI
* @param i 0 for random frame requests, n for nth frame frequency
* @param freq 0 for random frame requests, n for nth frame frequency
* @return OK or FAIL
*/
int setFrameToGuiFrequency(const uint32_t i);
int setFrameToGuiFrequency(const uint32_t freq);
/**
* Set the data stream enable
* @param enable 0 to disable, 1 to enable
* @return OK or FAIL
*/
uint32_t setDataStreamEnable(const uint32_t enable);
/**
* Overridden method
@ -702,7 +708,6 @@ private:
/** Set to self-terminate data callback threads waiting for semaphores */
bool killAllDataCallbackThreads;
bool dataCallbackEnabled;
//***general and listening thread parameters***

View File

@ -176,6 +176,9 @@ private:
/** Sets the receiver to send every nth frame to gui, or only upon gui request */
int set_read_frequency();
/* Set the data stream enable */
int set_data_stream_enable();
/** Enable File Write*/
int enable_file_write();

View File

@ -49,7 +49,9 @@ enum {
F_ENABLE_RECEIVER_OVERWRITE, /**< set overwrite flag in receiver */
F_ENABLE_RECEIVER_TEN_GIGA, /**< enable 10Gbe in receiver */
F_SET_RECEIVER_FIFO_DEPTH /**< set receiver fifo depth */
F_SET_RECEIVER_FIFO_DEPTH, /**< set receiver fifo depth */
F_STREAM_DATA_FROM_RECEIVER /**< stream data from receiver to client */
/* Always append functions hereafter!!! */
};

View File

@ -75,7 +75,8 @@ void UDPBaseImplementation::initializeMembers(){
//***acquisition parameters***
shortFrameEnable = -1;
FrameToGuiFrequency = 0;
frameToGuiFrequency = 0;
dataStreamEnable = false;
}
UDPBaseImplementation::~UDPBaseImplementation(){}
@ -172,7 +173,9 @@ char *UDPBaseImplementation::getEthernetInterface() const{
/***acquisition parameters***/
int UDPBaseImplementation::getShortFrameEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return shortFrameEnable;}
uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return FrameToGuiFrequency;}
uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return frameToGuiFrequency;}
uint32_t UDPBaseImplementation::getDataStreamEnable() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return dataStreamEnable;}
uint64_t UDPBaseImplementation::getAcquisitionPeriod() const{ FILE_LOG(logDEBUG) << __AT__ << " starting"; return acquisitionPeriod;}
@ -314,16 +317,28 @@ void UDPBaseImplementation::setShortFrameEnable(const int i){
FILE_LOG(logINFO) << "Short Frame Enable: " << stringEnable(shortFrameEnable);
}
int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t i){
int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t freq){
FILE_LOG(logDEBUG) << __AT__ << " starting";
FrameToGuiFrequency = i;
FILE_LOG(logINFO) << "Frame To Gui Frequency:" << FrameToGuiFrequency;
frameToGuiFrequency = freq;
FILE_LOG(logINFO) << "Frame To Gui Frequency:" << frameToGuiFrequency;
//overrridden child classes might return FAIL
return OK;
}
uint32_t UDPBaseImplementation::setDataStreamEnable(const uint32_t enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
dataStreamEnable = enable;
FILE_LOG(logINFO) << "Streaming Data from Receiver:" << dataStreamEnable;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setAcquisitionPeriod(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";

View File

@ -195,7 +195,8 @@ void UDPStandardImplementation::initializeMembers(){
numberofDataCallbackThreads = 1;
dataCallbackThreadsMask = 0x0;
killAllDataCallbackThreads = false;
dataCallbackEnabled = true; /**false*/
dataStreamEnable = false;
//***general and listening thread parameters***
threadStarted = false;
@ -282,8 +283,8 @@ int UDPStandardImplementation::setupFifoStructure(){
//else calculate best possible number of frames to listen to at a time (for fast readouts like gotthard)
else{
//if frequency to gui is not random (every nth frame), then listen to only n frames per buffer
if(FrameToGuiFrequency)
numberofJobsPerBuffer = FrameToGuiFrequency;
if(frameToGuiFrequency)
numberofJobsPerBuffer = frameToGuiFrequency;
//random frame sent to gui, then frames per buffer depends on acquisition period
else{
//calculate 100ms/period to get frames to listen to at a time
@ -445,17 +446,13 @@ void UDPStandardImplementation::setFileName(const char c[]){
}
if(dataCallbackEnabled && (strcmp(oldfilename,fileName))){cout<<"***Going to destroy data callback threads and create!!!"<<endl;
if(zmqThreadStarted){
if(dataStreamEnable && (strcmp(oldfilename,fileName))){
if(zmqThreadStarted)
createDataCallbackThreads(true);
zmqThreadStarted = false;
}
cout<<"***datacallback threads destroyed"<<endl;
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
if(createDataCallbackThreads() == FAIL){
cprintf(BG_RED,"Error: Could not create data callback threads\n");
}
cout<<"data call back threads created"<<endl;
}
@ -552,19 +549,52 @@ void UDPStandardImplementation::setShortFrameEnable(const int i){
}
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t i){
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq){
FILE_LOG(logDEBUG) << __AT__ << " called";
FrameToGuiFrequency = i;
frameToGuiFrequency = freq;
if(setupFifoStructure() == FAIL)
return FAIL;
FILE_LOG(logINFO) << "Frame to Gui Frequency: " << FrameToGuiFrequency;
FILE_LOG(logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency;
return OK;
}
uint32_t UDPStandardImplementation::setDataStreamEnable(const uint32_t enable){
FILE_LOG(logDEBUG) << __AT__ << " called";
cout<<"************datasend:"<<enable<<endl;
int olddatasend = dataStreamEnable;
dataStreamEnable = enable;
//if there is a change
if(olddatasend != dataStreamEnable){
cout<<"***Going to destroy data callback threads and create!!!"<<endl;
if(zmqThreadStarted)
createDataCallbackThreads(true);
cout<<"***datacallback threads destroyed"<<endl;
if(dataStreamEnable){
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
if(createDataCallbackThreads() == FAIL){
cprintf(BG_RED,"Error: Could not create data callback threads\n");
}
cout<<"data call back threads created"<<endl;
}
}
FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
return OK;
}
int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " called";
@ -839,7 +869,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
setupFifoStructure();
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
if(dataCallbackEnabled)
if(dataStreamEnable)
createDataCallbackThreads();
//allocate for latest data (frame copy for gui), free variables
@ -939,8 +969,8 @@ int UDPStandardImplementation::startReceiver(char *c){
}
FILE_LOG(logINFO) << "Number of Jobs Per Buffer: " << numberofJobsPerBuffer;
FILE_LOG(logINFO) << "Max Frames Per File:" << maxFramesPerFile;
if(FrameToGuiFrequency)
FILE_LOG(logINFO) << "Frequency of frames sent to gui: " << FrameToGuiFrequency;
if(frameToGuiFrequency)
FILE_LOG(logINFO) << "Frequency of frames sent to gui: " << frameToGuiFrequency;
else
FILE_LOG(logINFO) << "Frequency of frames sent to gui: Random";
@ -980,7 +1010,7 @@ int UDPStandardImplementation::startReceiver(char *c){
listeningThreadsMask|=(1<<i);
for(int i=0;i<numberofWriterThreads;i++)
writerThreadsMask|=(1<<i);
if(dataCallbackEnabled){
if(dataStreamEnable){
for(int i=0;i<numberofDataCallbackThreads;i++)
dataCallbackThreadsMask|=(1<<i);
}
@ -992,7 +1022,7 @@ int UDPStandardImplementation::startReceiver(char *c){
sem_post(&listenSemaphore[i]);
for(int i=0; i < numberofWriterThreads; i++)
sem_post(&writerSemaphore[i]);
if(dataCallbackEnabled){
if(dataStreamEnable){
for(int i=0;i<numberofDataCallbackThreads;i++)
sem_post(&dataCallbackSemaphore[i]);
}
@ -1639,8 +1669,7 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
//set current thread value index
int ithread = currentThreadIndex;
//let calling function know thread started and obtained current
zmqThreadStarted = 1;
// server address to bind
char hostName[100] = "tcp://127.0.0.1:";
@ -1668,6 +1697,9 @@ void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data
// bind
zmq_bind(zmqsocket,hostName);
//let calling function know thread started and obtained current (after sockets created)
if(!zmqThreadStarted)
zmqThreadStarted = true;
currentfnum = -1;
/* inner loop - loop for each buffer */
@ -2404,7 +2436,7 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer),ithread);
#endif
if(dataCallbackEnabled){
if(dataStreamEnable){
//ensure previous frame was processed
sem_wait(&writerGuiSemaphore[ithread]);
guiNumPackets[ithread] = dummyPacketValue;
@ -2518,7 +2550,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//copy frame for gui
//if(npackets >= (packetsPerFrame/numberofListeningThreads))
if(dataCallbackEnabled && npackets)
if(dataStreamEnable && npackets)
copyFrameToGui(ithread, wbuffer,npackets);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Copied frame\n");
@ -2700,7 +2732,7 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
FILE_LOG(logDEBUG) << __AT__ << " called";
//if nthe frame, wait for your turn (1st frame always shown as its zero)
if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency));
if(frameToGuiFrequency && ((frametoGuiCounter[ithread])%frameToGuiFrequency));
//random read (gui ready) or nth frame read: gui needs data now or it is the first frame
else{
@ -2725,7 +2757,7 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32
}
//update the counter for nth frame
if(FrameToGuiFrequency)
if(frameToGuiFrequency)
frametoGuiCounter[ithread]++;
@ -2852,7 +2884,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
#endif
if(!once){
if(dataCallbackEnabled)
if(dataStreamEnable)
copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame);
once = 1;
}

View File

@ -262,7 +262,7 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_ENABLE_RECEIVER_TEN_GIGA] = &slsReceiverTCPIPInterface::enable_tengiga;
flist[F_SET_RECEIVER_FIFO_DEPTH] = &slsReceiverTCPIPInterface::set_fifo_depth;
flist[F_STREAM_DATA_FROM_RECEIVER] = &slsReceiverTCPIPInterface::set_data_stream_enable;
#ifdef VERYVERBOSE
for (int i=0;i<numberOfFunctions;i++)
@ -2096,24 +2096,25 @@ int slsReceiverTCPIPInterface::set_read_frequency(){
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set receiver frequency mode while receiver not idle\n");
cprintf(RED,"%s\n",mess);
ret = FAIL;
}
/*
else if((receiverBase->getStatus()==RUNNING) && (index >= 0)){
ret = FAIL;
strcpy(mess,"cannot set up receiver mode when receiver is running\n");
}*/
else{
if(index >= 0){
if(index >= 0 ){
ret = receiverBase->setFrameToGuiFrequency(index);
if(ret == FAIL)
if(ret == FAIL){
strcpy(mess, "Could not allocate memory for listening fifo\n");
cprintf(RED,"%s\n",mess);
}
}
retval=receiverBase->getFrameToGuiFrequency();
if(index>=0 && retval!=index)
if(index>=0 && retval!=index){
strcpy(mess,"Could not set frame to gui frequency");
cprintf(RED,"%s\n",mess);
ret = FAIL;
}
}
}
#endif
@ -2138,6 +2139,69 @@ int slsReceiverTCPIPInterface::set_read_frequency(){
int slsReceiverTCPIPInterface::set_data_stream_enable(){
ret=OK;
int retval=-1;
int index;
strcpy(mess,"Could not set data stream enable\n");
// receive arguments
if(socket->ReceiveDataOnly(&index,sizeof(index)) < 0 ){
strcpy(mess,"Error reading from socket\n");
ret = FAIL;
}
// execute action if the arguments correctly arrived
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (ret==OK) {
if (lockStatus==1 && socket->differentClients==1){
sprintf(mess,"Receiver locked by %s\n", socket->lastClientIP);
ret=FAIL;
}
else if (receiverBase == NULL){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set data stream enable while receiver not idle\n");
cprintf(RED,"%s\n",mess);
ret = FAIL;
}
else{
if(index >= 0 )
ret = receiverBase->setDataStreamEnable(index);
retval=receiverBase->getDataStreamEnable();
if(index>=0 && retval!=index){
strcpy(mess,"Could not set data stream enable");
cprintf(RED,"%s\n",mess);
ret = FAIL;
}
}
}
#endif
if(ret==OK && socket->differentClients){
FILE_LOG(logDEBUG) << "Force update";
ret=FORCE_UPDATE;
}
// send answer
socket->SendDataOnly(&ret,sizeof(ret));
if(ret==FAIL){
cprintf(RED,"%s\n",mess);
socket->SendDataOnly(mess,sizeof(mess));
}
socket->SendDataOnly(&retval,sizeof(retval));
//return ok/fail
return ret;
}
int slsReceiverTCPIPInterface::enable_file_write(){
ret=OK;