almost done

This commit is contained in:
Dhanya Maliakal
2016-09-09 17:52:07 +02:00
parent 258e671420
commit 3ed738b949
7 changed files with 743 additions and 202 deletions

View File

@ -17,6 +17,9 @@
#include <string.h>
#include <stdint.h>
#include <ctime>
#include <zmq.h> //zmq
using namespace std;
#define WRITE_HEADERS
@ -44,6 +47,7 @@ UDPStandardImplementation::UDPStandardImplementation(){
}else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")){
FILE_LOG(logDEBUG) << "Warning: No root permission to change max length of input queue in file /proc/sys/net/core/netdev_max_backlog";
}
/** permanent setting by heiner
net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000
@ -85,12 +89,12 @@ void UDPStandardImplementation::deleteMembers(){
}
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
if(latestData[i]) {delete[] latestData[i]; latestData[i] = NULL;}
guiData[i] = NULL;
}
//kill threads
if(threadStarted){
createListeningThreads(true);
createWriterThreads(true);
threadStarted = false;
}
}
@ -124,6 +128,7 @@ void UDPStandardImplementation::initializeMembers(){
FILE_LOG(logDEBUG) << "Info: Initializing members";
//***detector parameters***
detID = 0;
bufferSize = 0;
onePacketSize = 0;
oneDataSize = 0;
@ -180,13 +185,13 @@ void UDPStandardImplementation::initializeMembers(){
//***receiver to GUI parameters***
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
latestData[i] = NULL;
guiDataReady[i] = false;
guiData[i] = NULL;
guiNumPackets[i] = 0;
strcpy(guiFileName[i],"");
frametoGuiCounter[i] = 0;
}
//***data callback thread parameters***
zmqThreadStarted = false;
numberofDataCallbackThreads = 1;
dataCallbackThreadsMask = 0x0;
killAllDataCallbackThreads = false;
@ -321,6 +326,7 @@ int UDPStandardImplementation::setupFifoStructure(){
if(threadStarted){
createListeningThreads(true);
createWriterThreads(true);
threadStarted = false;
}
@ -416,6 +422,44 @@ void UDPStandardImplementation::configure(map<string, string> config_map){
}
void UDPStandardImplementation::setFileName(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
char oldfilename[MAX_STR_LENGTH];
strcpy(oldfilename,fileName);
if(strlen(c))
strcpy(fileName, c);
if(strlen(fileName)){
int detindex = -1;
string tempname(fileName);
size_t uscore=tempname.rfind("_");
if (uscore!=string::npos){
if (sscanf(tempname.substr(uscore+1,tempname.size()-uscore-1).c_str(),"d%d",&detindex)) {
detID = detindex;
}
}
if(detindex == -1)
detID = 0;
}
if(dataCallbackEnabled && (strcmp(oldfilename,fileName))){cout<<"***Going to destroy data callback threads and create!!!"<<endl;
if(zmqThreadStarted){
createDataCallbackThreads(true);
zmqThreadStarted = false;
}
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
if(createDataCallbackThreads() == FAIL){
cprintf(BG_RED,"Error: Could not create data callback threads\n");
}
}
FILE_LOG(logINFO) << "File name:" << fileName;
}
int UDPStandardImplementation::setDataCompressionEnable(const bool b){
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -442,9 +486,9 @@ int UDPStandardImplementation::setDataCompressionEnable(const bool b){
else
numberofWriterThreads = 1;
if(createWriterThreads() == FAIL){
cprintf(BG_RED,"Error: Could not create writer threads\n");
return FAIL;
}
cprintf(BG_RED,"Error: Could not create writer threads\n");
return FAIL;
}
//-- end of create writer threads
setThreadPriorities();
@ -779,22 +823,26 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
pthread_mutex_unlock(&(statusMutex));
if(threadStarted){
createListeningThreads(true);
createDataCallbackThreads(true);
createWriterThreads(true);
}
numberofListeningThreads = MAX_NUMBER_OF_LISTENING_THREADS;
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS;
}
if(zmqThreadStarted)
createDataCallbackThreads(true);
//set up fifo structure -1 for numberofJobsPerBuffer ensure it is done
numberofJobsPerBuffer = -1;
setupFifoStructure();
numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS;
if(dataCallbackEnabled)
createDataCallbackThreads();
//allocate for latest data (frame copy for gui), free variables
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++){
if(latestData[i]) {delete[] latestData[i]; latestData[i] = NULL;}
guiData[i] = NULL;
latestData[i] = new char[bufferSize];
}
@ -803,7 +851,6 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
if(myDetectorType == EIGER){
for(int i=0; i<MAX_NUMBER_OF_WRITER_THREADS; i++)
updateFileHeader(i);
createDataCallbackThreads();
}
FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d);
@ -867,8 +914,7 @@ int UDPStandardImplementation::startReceiver(char *c){
}
//reset gui variables
frametoGuiCounter[i] = 0;
guiData[i] = NULL;
guiDataReady[i]=0;
guiNumPackets[i] = 0;
strcpy(guiFileName[i],"");
}
@ -920,8 +966,10 @@ int UDPStandardImplementation::startReceiver(char *c){
sprintf(completeFileName[0], "%s/%s_fxxx_%lld_xx.root", filePath,fileNamePerThread[0],(long long int)fileIndex);
//initialize semaphore to synchronize between writer and gui reader threads
for(int i=0;i<numberofWriterThreads;i++)
for(int i=0;i<numberofWriterThreads;i++){
sem_init(&writerGuiSemaphore[i],1,0);
sem_init(&dataCallbackWriterSemaphore[i],1,0);
}
//status and thread masks
pthread_mutex_lock(&statusMutex);
@ -968,14 +1016,14 @@ void UDPStandardImplementation::stopReceiver(){
//wait until status is run_finished
while(status == TRANSMITTING){
for(int i=0; i < numberofWriterThreads; i++)
sem_post(&writerGuiSemaphore[i]);
usleep(5000);
}
//semaphore destroy
for(int i=0; i < numberofWriterThreads; i++)
for(int i=0; i < numberofWriterThreads; i++){
sem_destroy(&writerGuiSemaphore[i]);
sem_destroy(&dataCallbackWriterSemaphore[i]);
}
//change status
pthread_mutex_lock(&statusMutex);
@ -1077,71 +1125,9 @@ void UDPStandardImplementation::startReadout(){
void UDPStandardImplementation::readFrame(int ithread, char* c,char** raw, int64_t &startAcq, int64_t &startFrame){
FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data, to let writer thread know that gui is back for data
if (guiData[ithread] == NULL){
guiData[ithread] = latestData[ithread];
#ifdef DEBUG4
cprintf(CYAN,"Info: gui data not null anymore - ready to get data\n");
#endif
}
//copy data and filename
strcpy(c,guiFileName[ithread]);
startAcq = startAcquisitionIndex;
startFrame = startFrameIndex;
//gui data not copied yet
if(!guiDataReady[ithread]){
#ifdef DEBUG4
cprintf(CYAN,"Info: gui data not ready\n");
#endif
*raw = NULL;
}
//gui data ready, pass address to gui to copy the data
else{
#ifdef DEBUG4
cprintf(CYAN,"Info: gui data ready\n");
#endif
*raw = guiData[ithread];
guiData[ithread] = NULL;
//for nth frame to gui, post semaphore so writer stops waiting
if((FrameToGuiFrequency) && (writerThreadsMask)){
#ifdef DEBUG4
cprintf(CYAN,"Info: gonna post\n");
#endif
//release after getting data
sem_post(&writerGuiSemaphore[ithread]);
}
#ifdef DEBUG4
cprintf(CYAN,"Info: done post\n");
#endif
}
}
/*
void UDPStandardImplementation::resetGuiPointer(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called";
guiData[ithread] = NULL;
//for nth frame to gui, post semaphore so writer stops waiting
if((FrameToGuiFrequency) && (writerThreadsMask)){
#ifdef DEBUG4
cprintf(CYAN,"Info: gonna post\n");
#endif
//release after getting data
sem_post(&writerGuiSemaphore[ithread]);
}
#ifdef DEBUG4
cprintf(CYAN,"Info: done post\n");
#endif
}
*/
void UDPStandardImplementation::closeFile(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called for " << ithread ;
@ -1220,10 +1206,11 @@ int UDPStandardImplementation::createDataCallbackThreads(bool destroy){
for(int i = 0; i < numberofDataCallbackThreads; ++i){
sem_post(&dataCallbackSemaphore[i]);
pthread_join(dataCallbackThreads[i],NULL);
sem_destroy(&dataCallbackSemaphore[i]);
FILE_LOG(logDEBUG) << "." << flush;
}
killAllDataCallbackThreads = false;
threadStarted = false;
zmqThreadStarted = false;
FILE_LOG(logDEBUG) << "Info: Data Callback thread(s) destroyed";
}
@ -1236,13 +1223,13 @@ int UDPStandardImplementation::createDataCallbackThreads(bool destroy){
for(int i = 0; i < numberofDataCallbackThreads; ++i){
sem_init(&dataCallbackSemaphore[i],1,0);
threadStarted = false;
zmqThreadStarted = false;
currentThreadIndex = i;
if(pthread_create(&dataCallbackThreads[i], NULL,startDataCallbackThread, (void*) this)){
FILE_LOG(logERROR) << "Could not create listening thread with index " << i;
return FAIL;
}
while(!threadStarted);
while(!zmqThreadStarted);
FILE_LOG(logDEBUG) << "." << flush;
}
FILE_LOG(logDEBUG) << "Info: Data Callback thread(s) created successfully.";
@ -1270,6 +1257,7 @@ int UDPStandardImplementation::createListeningThreads(bool destroy){
for(int i = 0; i < numberofListeningThreads; ++i){
sem_post(&listenSemaphore[i]);
pthread_join(listeningThreads[i],NULL);
sem_destroy(&listenSemaphore[i]);
FILE_LOG(logDEBUG) << "." << flush;
}
killAllListeningThreads = false;
@ -1321,6 +1309,7 @@ int UDPStandardImplementation::createWriterThreads(bool destroy){
for(int i = 0; i < numberofWriterThreads; ++i){
sem_post(&writerSemaphore[i]);
pthread_join(writingThreads[i],NULL);
sem_destroy(&writerSemaphore[i]);
FILE_LOG(logDEBUG) <<"."<<flush;
}
killAllWritingThreads = false;
@ -1643,54 +1632,128 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){
void UDPStandardImplementation::startDataCallback(){
void UDPStandardImplementation::startDataCallback(){cprintf(MAGENTA,"start data call back thread started %d\n",currentThreadIndex);
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
int ithread = currentThreadIndex;
//let calling function know thread started and obtained current
threadStarted = 1;
char* buffer;
zmqThreadStarted = 1;
// server address to bind
const char *hostName = "tcp://127.0.0.1:70001";/**increment this by ithread and detid*/
char hostName[100] = "tcp://127.0.0.1:";
int portno = DEFAULT_ZMQ_PORTNO + (detID*2+ithread);
sprintf(hostName,"%s%d",hostName,portno);
FILE_LOG(logINFO) << "Thread" << ithread << ": ZMQ Server at " << hostName;
/* outer loop - loops once for each acquisition */
//infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again)
while(true){
int oneframesize = oneDataSize * packetsPerFrame;
char* buffer = new char[packetsPerFrame*oneDataSize];
memset(buffer,0xFF,oneframesize);
int bufferoffset = 0;
int size = 0;
int offset=0;
int currentfnum = 0;
uint64_t fnum = 0;
uint32_t pnum = 0;
void *context = zmq_ctx_new();
// create a publisher
socket = zmq_socket(context, ZMQ_PUB);
void *zmqsocket = zmq_socket(context, ZMQ_PUB);
// bind
zmq_bind(socket,hostName);/**increment this by 1*/
zmq_bind(zmqsocket,hostName);
currentfnum = -1;
/* inner loop - loop for each buffer */
//until mask reset (udp sockets shut down by client)
//until mask reset (dummy pcaket got by writer)
while((1 << ithread) & dataCallbackThreadsMask){
//wait for data
sem_wait(&dataCallbackSemaphore[ithread]);
if(status == TRANSMITTING)
//let the writer thread continue, while we process carry over if any
sem_post(&writerGuiSemaphore[ithread]);
//wait for receiver to send more data
sem_wait(&dataCallbackWriterSemaphore[ithread]);
//everything is done
if(guiNumPackets[ithread] == dummyPacketValue){
/**suing this in clientzmq_msg_more,
* in serve use zmq_msg_send (&message, sender, ZMQ_SNDMORE); and 0 for last packet, but better to check lengt*/
zmq_send (zmqsocket, "end", 3, 0);
pthread_mutex_lock(&statusMutex);
dataCallbackThreadsMask^=(1<<ithread);
pthread_mutex_unlock(&statusMutex);
continue;
int numpackets = (uint32_t)(*( (uint32_t*) latestData)); /*latestdata should be size of one buffer*/
memcpy(buffer, latestData, numpackets*onePacketSize);/**read first bytes to get numpackets*/
}
size = guiNumPackets[ithread]*onePacketSize;
offset=0;
while(offset < size){
//until getting frame number is not error
while((size>0) && (getFrameandPacketNumber(ithread, latestData[ithread]+offset, fnum, pnum)==FAIL)){
offset+= onePacketSize;
}
//end of buffer
if(offset >= size)
break;
//new frame
if(currentfnum==-1){
currentfnum = fnum;
}
//last packet
if(pnum == packetsPerFrame){
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
offset+= onePacketSize;
zmq_send(zmqsocket, buffer, oneframesize, 0);
memset(buffer,0xFF,oneframesize);
currentfnum = -1;
}
//same frame (not last) or next frame
else {
//next frame
if(fnum > currentfnum){
zmq_send(zmqsocket, buffer, oneframesize, 0);
memset(buffer,0xFF,oneframesize);
currentfnum = fnum;
}
memcpy(buffer+((pnum-1)*oneDataSize), latestData[ithread]+offset+8,oneDataSize);
offset+= onePacketSize;
}
}
/*check if it should be added to previous (processlistening buffer for datacompression)*/
zmq_send(socket, buffer.data(), buffer.size(), 0);
}/*--end of loop for each buffer (inner loop)*/
//free resources
delete[] buffer;
zmq_unbind(zmqsocket, hostName);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
//end of acquisition, wait for next acquisition/change of parameters
sem_wait(&dataCallbackSemaphore[ithread]);
//check to exit thread (for change of parameters) - only EXIT possibility
if(killAllDataCallbackThreads){
cprintf(BLUE,"DataCallback_Thread %d:Goodbye!\n",ithread);
//free resources at exit
zmq_unbind(socket, hostName);
zmq_close(socket);
zmq_ctx_destroy(context);
cprintf(MAGENTA,"DataCallback_Thread %d:Goodbye!\n",ithread);
pthread_exit(NULL);
}
@ -1701,7 +1764,7 @@ void UDPStandardImplementation::startDataCallback(){
void UDPStandardImplementation::startListening(){
void UDPStandardImplementation::startListening(){cprintf(BLUE,"startlistening thread started %d\n",currentThreadIndex);
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
@ -2136,7 +2199,7 @@ uint32_t UDPStandardImplementation::processListeningBuffer(int ithread, int &cSi
void UDPStandardImplementation::startWriting(){
void UDPStandardImplementation::startWriting(){cprintf(GREEN,"start writing thread started %d\n",currentThreadIndex);
FILE_LOG(logDEBUG) << __AT__ << " called";
//set current thread value index
@ -2157,7 +2220,6 @@ void UDPStandardImplementation::startWriting(){
//--reset parameters before acquisition
nf = 0;
guiData[ithread] = latestData[ithread]; //so that the first frame is always copied
if(dataCompressionEnable)
listenfifoIndex = 0; //compression has only one listening thread
@ -2255,7 +2317,7 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
if(myDetectorType == EIGER){
int detindex = -1;
string tempname(fileName);
//detid (more than 1 half module)
size_t uscore=tempname.rfind("_");
if (uscore!=string::npos){
if (sscanf(tempname.substr(uscore+1,tempname.size()-uscore-1).c_str(),"d%d",&detindex)) {
@ -2263,11 +2325,12 @@ void UDPStandardImplementation::waitWritingBufferForNextAcquisition(int ithread)
sprintf(fileNamePerThread[ithread],"%s_d%d",tempname.c_str(),detindex*2+ithread);
}
}
//only one half module, so no detid
if(detindex == -1)
sprintf(fileNamePerThread[ithread],"%s_d%d",fileName,ithread);
}
}else
strcpy(fileNamePerThread[0],fileName);
if(dataCompressionEnable){
#ifdef MYROOT1
@ -2335,6 +2398,14 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
cprintf(YELLOW,"Writing_Thread %d: Freeing dummy-end buffer. Pushed into fifofree %p for listener %d\n", ithread,(void*)(wbuffer),ithread);
#endif
if(dataCallbackEnabled){
//ensure previous frame was processed
sem_wait(&writerGuiSemaphore[ithread]);
guiNumPackets[ithread] = dummyPacketValue;
//let it know its got data
sem_post(&dataCallbackWriterSemaphore[ithread]);
}
//all threads need to close file, reset mask and exit loop
closeFile(ithread);
@ -2349,18 +2420,12 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//thread 0 waits for all threads to finish & print statistics
if(ithread == 0){
//wait for all other threads
if(dataCompressionEnable){
cprintf(GREEN,"Writing_Thread %d: Waiting for jobs to be done.. current mask:0x%x\n",ithread, writerThreadsMask);
while(writerThreadsMask){
/*cout << "." << flush;*/
usleep(50000);
}
cprintf(GREEN,"Writing_Thread %d: Jobs Done!\n",ithread);
}
while(writerThreadsMask)
usleep(5000);
//ensure listening threads done before updating status as it returns to client (from stopReceiver)
while(listeningThreadsMask)
usleep(5000);
//ensure datacallbacks threads are done
while(dataCallbackThreadsMask)
usleep(5000);
//update status
@ -2412,7 +2477,8 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//get current frame number
uint64_t tempframenumber;
if(getFrameNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber) == FAIL){
uint32_t pnum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS,tempframenumber,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
@ -2432,7 +2498,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//callback to write data
if (cbAction < DO_EVERYTHING)
rawDataReadyCallBack((int)currentFrameNumber[ithread], wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, npackets * onePacketSize,
sfilefd[ithread], guiData[ithread],pRawDataReady);//know which thread from sfilefd
sfilefd[ithread], latestData[ithread],pRawDataReady);//know which thread from sfilefd
@ -2446,7 +2512,7 @@ void UDPStandardImplementation::handleWithoutDataCompression(int ithread, char*
//copy frame for gui
//if(npackets >= (packetsPerFrame/numberofListeningThreads))
if(npackets)
if(dataCallbackEnabled && npackets)
copyFrameToGui(ithread, wbuffer,npackets);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: Copied frame\n");
@ -2487,8 +2553,9 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
if(numpackets &&(lastFrameNumberInFile[ithread]>=0)){
//get start frame (required to create new file at the right juncture)
uint64_t startframe =-1;
uint32_t pnum;
//if(ithread) cout<<"getting start frame number"<<endl;
if(getFrameNumber(ithread, wbuffer + offset, startframe) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, startframe,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2545,7 +2612,8 @@ void UDPStandardImplementation::writeFileWithoutCompression(int ithread, char* w
if(numpackets){
//get last frame number
uint64_t finalLastFrameNumberToSave = 0;
if(getFrameNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave) == FAIL){
uint32_t pnum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS + ((numpackets - 1) * onePacketSize), finalLastFrameNumberToSave,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2621,50 +2689,32 @@ void UDPStandardImplementation::updateFileHeader(int ithread){
}
//called only if datacallback enabled
void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32_t numpackets){
FILE_LOG(logDEBUG) << __AT__ << " called";
//if nthe frame, wait for your turn (1st frame always shown as its zero)
if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency));
//random read (gui ready) or nth frame read: gui needs data now or it is the first frame
else{
//tell datacallback to pick up data
sem_post(&dataCallbackSemaphore[ithread]);
memcpy(latestData[ithread],buffer , numpackets*onePacketSize);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n");
cprintf(GREEN,"Writing_Thread: CopyingFrame: Going to copy data\n");
#endif
pthread_mutex_lock(&dataReadyMutex);
guiDataReady[ithread]=0;
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: guidataready is 0, Copying data\n");
#endif
memcpy(latestData[ithread],buffer , numpackets*onePacketSize);
//ensure previous frame was processed
sem_wait(&writerGuiSemaphore[ithread]);
//copy date
guiNumPackets[ithread] = numpackets;
strcpy(guiFileName[ithread],completeFileName[ithread]);
guiDataReady[ithread]=1;
pthread_mutex_unlock(&dataReadyMutex);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Copied Data, guidataready is 1\n");
#endif
memcpy(latestData[ithread],buffer+ HEADER_SIZE_NUM_TOT_PACKETS , numpackets*onePacketSize);
//let it know its got data
sem_post(&dataCallbackWriterSemaphore[ithread]);
//nth frame read, block current process if the guireader hasnt read it yet
if(FrameToGuiFrequency){
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Waiting after copying\n");
cprintf(GREEN,"Writing_Thread: CopyingFrame: Copied Data\n");
#endif
sem_wait(&writerGuiSemaphore[ithread]);
#ifdef DEBUG4
cprintf(GREEN,"Writing_Thread: CopyingFrame: Done waiting\n");
#endif
}
}
@ -2684,7 +2734,8 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
//get frame number
uint64_t tempframenumber=-1;
if(getFrameNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber) == FAIL){
uint32_t pnum;
if(getFrameandPacketNumber(ithread, wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, tempframenumber,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return;
@ -2795,7 +2846,8 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
#endif
if(!once){
copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame);
if(dataCallbackEnabled)
copyFrameToGui(ithread, buff[0],(uint32_t)packetsPerFrame);
once = 1;
}
}
@ -2819,59 +2871,60 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
int UDPStandardImplementation::getFrameNumber(int ithread, char* wbuffer, uint64_t &tempframenumber){
int UDPStandardImplementation::getFrameandPacketNumber(int ithread, char* wbuffer, uint64_t &framenumber, uint32_t &packetnumber){
FILE_LOG(logDEBUG) << __AT__ << " called";
eiger_packet_footer_t* footer=0;
jfrau_packet_header_t* header=0;
int pnum=-1;
switch(myDetectorType){
case EIGER:
footer = (eiger_packet_footer_t*)(wbuffer + footerOffset);
tempframenumber = (uint32_t)(*( (uint64_t*) footer));
framenumber = (uint32_t)(*( (uint64_t*) footer));
//error in frame number sent by fpga
if(!((uint32_t)(*( (uint64_t*) footer)))){
tempframenumber = -1;
framenumber = -1;
FILE_LOG(logERROR) << "Fifo "<< ithread << ": Frame Number is zero from firmware.";
return FAIL;
}
packetnumber = (*( (uint16_t*) footer->packetNumber));
#ifdef DEBUG4
if(!ithread) cprintf(GREEN,"Writing_Thread %d: fnum:%lld pnum:%d FPGA_fnum:%d footeroffset:%d\n",
ithread,
(long long int)tempframenumber,
(*( (uint16_t*) footer->packetNumber)),
(uint32_t)(*( (uint64_t*) footer)),
(long long int)framenumber,
packetnumber,
framenumber,
footerOffset);
#endif
tempframenumber += (startFrameIndex - 1);
framenumber += (startFrameIndex - 1);
break;
case JUNGFRAU:
header = (jfrau_packet_header_t*)(wbuffer);
tempframenumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask;
framenumber = (*( (uint32_t*) header->frameNumber))&frameIndexMask;
packetnumber = (uint32_t)(*( (uint8_t*) header->packetNumber));
#ifdef DEBUG4
cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n",
(long long int)tempframenumber,
(*( (uint8_t*) header->packetNumber)));
(long long int)framenumber,
packetnumber);
#endif
tempframenumber += startFrameIndex;
framenumber += startFrameIndex;
break;
default:
tempframenumber = ((uint32_t)(*((uint32_t*)(wbuffer))));
framenumber = ((uint32_t)(*((uint32_t*)(wbuffer))));
//for gotthard and normal frame, increment frame number to separate fnum and pnum
if (myDetectorType == PROPIX ||(myDetectorType == GOTTHARD && shortFrameEnable == -1))
tempframenumber++;
pnum = tempframenumber&packetIndexMask;
tempframenumber = (tempframenumber & frameIndexMask) >> frameIndexOffset;
framenumber++;
packetnumber = framenumber&packetIndexMask;
framenumber = (framenumber & frameIndexMask) >> frameIndexOffset;
#ifdef DEBUG4
cprintf(GREEN, "Writing_Thread %d: fnum:%lld\t pnum:%d\n",
(long long int)tempframenumber,
pnum);
(long long int)framenumber,
packetnumber);
#endif
tempframenumber += startFrameIndex;
framenumber += startFrameIndex;
break;
}
return OK;
@ -2890,9 +2943,10 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
int endoffset = startoffset + numpackets * onePacketSize;
uint64_t tempframenumber=-1;
offset = endoffset;
uint32_t pnum;
//get last frame number
if(getFrameNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + (endoffset-onePacketSize), tempframenumber,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
@ -2918,7 +2972,7 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
offset -= bigIncrements;
if(offset<startoffset)
break;//if(ithread) cout<<"frame number at going backwards fast f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
@ -2926,7 +2980,7 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
}
if(offset<startoffset){
offset = startoffset;//if(ithread) cout<<"offset < start offset f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;
@ -2934,7 +2988,7 @@ int UDPStandardImplementation::writeUptoFrameNumber(int ithread, char* wbuffer,
}
while(tempframenumber<nextFrameNumber){
offset += onePacketSize;//if(ithread) cout<<"frame number at going forwards slow f#:"<<tempframenumber<< " offset:"<<offset<<endl;
if(getFrameNumber(ithread, wbuffer + offset, tempframenumber) == FAIL){
if(getFrameandPacketNumber(ithread, wbuffer + offset, tempframenumber,pnum) == FAIL){
//error in frame number sent by fpga
while(!fifoFree[ithread]->push(wbuffer));
return FAIL;

View File

@ -353,6 +353,10 @@ int slsReceiverTCPIPInterface::set_detector_type(){
sprintf(mess,"Receiver locked by %s\n", socket->lastClientIP);
ret=FAIL;
}
else if((receiverBase)&&(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING)){
strcpy(mess,"Can not set detector type while receiver not idle\n");
ret = FAIL;
}
else{
switch(dr){
@ -443,6 +447,10 @@ int slsReceiverTCPIPInterface::set_file_name() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set file name while receiver not idle\n");
ret = FAIL;
}
else{
receiverBase->setFileName(fName);
retval = receiverBase->getFileName();
@ -506,15 +514,15 @@ int slsReceiverTCPIPInterface::set_file_dir() {
if (lockStatus==1 && socket->differentClients==1){
sprintf(mess,"Receiver locked by %s\n", socket->lastClientIP);
ret=FAIL;
}/*
else if((strlen(fPath))&&(receiverBase->getStatus()==RUNNING)){
strcpy(mess,"Can not set file path while receiver running\n");
ret = FAIL;
}*/
}
else if (receiverBase == NULL){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set file path while receiver not idle\n");
ret = FAIL;
}
else{
receiverBase->setFilePath(fPath);
retval = receiverBase->getFilePath();
@ -584,6 +592,10 @@ int slsReceiverTCPIPInterface::set_file_index() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set file index while receiver not idle\n");
ret = FAIL;
}
else{
if(index >= 0)
receiverBase->setFileIndex(index);
@ -648,6 +660,10 @@ int slsReceiverTCPIPInterface::set_frame_index() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set frame index while receiver not idle\n");
ret = FAIL;
}
else{
//client sets to 0, but for receiver it is just an enable
//client uses this value for other detectors not using receiver,
@ -725,9 +741,9 @@ int slsReceiverTCPIPInterface::setup_udp(){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING){
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set up udp while receiver not idle\n");
ret = FAIL;
strcpy(mess,"cannot set up udp when receiver is running\n");
}
else{
//set up udp port
@ -859,9 +875,12 @@ int slsReceiverTCPIPInterface::stop_receiver(){
ret=FAIL;
}
else{
if(receiverBase->getStatus()!=IDLE)
if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
receiverBase->stopReceiver();
cout<<"receiver stopped"<<endl;
}
s = receiverBase->getStatus();
cout<<"to stop, receiver status:"<<s<<endl;
if(s==IDLE)
ret = OK;
else{
@ -1060,7 +1079,7 @@ int slsReceiverTCPIPInterface::set_short_frame() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING){
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Cannot set short frame while status is running\n");
ret=FAIL;
}
@ -2077,6 +2096,10 @@ int slsReceiverTCPIPInterface::set_read_frequency(){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set receiver frequency mode while receiver not idle\n");
ret = FAIL;
}
/*
else if((receiverBase->getStatus()==RUNNING) && (index >= 0)){
ret = FAIL;
@ -2142,6 +2165,10 @@ int slsReceiverTCPIPInterface::enable_file_write(){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set file write mode while receiver not idle\n");
ret = FAIL;
}
else{
if(enable >= 0)
receiverBase->setFileWriteEnable(enable);
@ -2213,7 +2240,12 @@ int slsReceiverTCPIPInterface::start_readout(){
if (receiverBase == NULL){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}else{
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not start receiver readout while receiver not idle\n");
ret = FAIL;
}
else{
receiverBase->startReadout();
retval = receiverBase->getStatus();
if((retval == TRANSMITTING) || (retval == RUN_FINISHED) || (retval == IDLE))
@ -2269,6 +2301,10 @@ int slsReceiverTCPIPInterface::set_timer() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set timer while receiver not idle\n");
ret = FAIL;
}
else{
if(index[0] == FRAME_PERIOD){
if(index[1]>=0){
@ -2344,7 +2380,7 @@ int slsReceiverTCPIPInterface::enable_compression() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING){
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Cannot enable/disable compression while status is running\n");
ret=FAIL;
}
@ -2413,6 +2449,10 @@ int slsReceiverTCPIPInterface::set_detector_hostname() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set detector hostname while receiver not idle\n");
ret = FAIL;
}
else{
receiverBase->initialize(hostname);
retval = receiverBase->getDetectorHostname();
@ -2494,7 +2534,12 @@ int slsReceiverTCPIPInterface::set_dynamic_range() {
if (receiverBase == NULL){
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}else{
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set dynamic range while receiver not idle\n");
ret = FAIL;
}
else{
if(dr > 0){
ret = receiverBase->setDynamicRange(dr);
if(ret == FAIL)
@ -2571,6 +2616,10 @@ int slsReceiverTCPIPInterface::enable_overwrite() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set overwrite mode while receiver not idle\n");
ret = FAIL;
}
else{
if(index >= 0)
receiverBase->setOverwriteEnable(index);
@ -2634,6 +2683,10 @@ int slsReceiverTCPIPInterface::enable_tengiga() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Can not set up 1Giga/10Giga mode while receiver not idle\n");
ret = FAIL;
}
else{
if(val >= 0)
ret = receiverBase->setTenGigaEnable(val);
@ -2699,7 +2752,7 @@ int slsReceiverTCPIPInterface::set_fifo_depth() {
strcpy(mess,SET_RECEIVER_ERR_MESSAGE);
ret=FAIL;
}
else if(receiverBase->getStatus()==RUNNING){
else if(receiverBase->getStatus()==RUNNING || receiverBase->getStatus()==TRANSMITTING){
strcpy(mess,"Cannot set/get fifo depth while status is running\n");
ret=FAIL;
}