additional change

This commit is contained in:
Dhanya Maliakal 2015-10-08 17:09:43 +02:00
parent a3e88f96d6
commit e915245c10
5 changed files with 297 additions and 437 deletions

View File

@ -111,7 +111,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
uint64_t getFramesCaught() const;
/**
* Get Current Frame Index Caught for an entire acquisition (including all scans)
* Get Current Frame Index for an entire acquisition (including all scans)
* @return current frame index (represents all scans too)
*/
int64_t getAcquisitionIndex() const;
@ -372,8 +372,8 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
void startReadout();
/**
* shuts down the udp sockets
* \returns OK or FAIL
* Shuts down and deletes UDP Sockets
* @return OK or FAIL
*/
int shutDownUDPSockets();

View File

@ -171,7 +171,7 @@ class UDPInterface {
virtual uint64_t getFramesCaught() const = 0;
/**
* Get Current Frame Index Caught for an entire acquisition (including all scans)
* Get Current Frame Index for an entire acquisition (including all scans)
* @return current frame index (represents all scans too) or -1 if no packets caught
*/
virtual int64_t getAcquisitionIndex() const = 0;
@ -430,8 +430,8 @@ class UDPInterface {
virtual void startReadout() = 0;
/**
* shuts down the udp sockets
* \returns OK or FAIL
* Shuts down and deletes UDP Sockets
* @return OK or FAIL
*/
virtual int shutDownUDPSockets() = 0;

View File

@ -57,8 +57,6 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
* They access local cache of configuration or detector parameters *******
*************************************************************************/
//***acquisition count parameters***
/*************************************************************************
* Setters ***************************************************************
@ -154,14 +152,14 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
*/
int startReceiver(char *c=NULL);
/**
* Shuts down and deletes UDP Sockets
* @return OK or FAIL
*/
int shutDownUDPSockets();
private:
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
*************************************************************************/
//**initial parameters***
/**
@ -227,6 +225,13 @@ private:
*/
int createUDPSockets();
/**
* Initializes writer variables and creates the first file
* also does the startAcquisitionCallBack
* @return OK or FAIL
*/
int setupWriter();
//**detector parameters***
@ -277,19 +282,27 @@ private:
/** Footer offset from start of Packet*/
int footerOffset;
//***File parameters***
/** Maximum Packets Per File **/
int maxPacketsPerFile;
/** If file created successfully for all Writer Threads */
bool fileCreateSuccess;
//***acquisition indices parameters***
/** Frame Number of First Frame of an Acquisition */
//***acquisition indices/count parameters***
/** Frame Number of First Frame of an entire Acquisition (including all scans) */
uint64_t startAcquisitionIndex;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint64_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint64_t frameIndex;
/** Current Frame Number */
uint64_t currentFrameNumber;
@ -302,6 +315,19 @@ private:
/** Total Frame Count listened to by listening threads */
int totalListeningFrameCount[MAX_NUMBER_OF_LISTENING_THREADS];
/** Pckets currently in current file, starts new file when it reaches max */
uint32_t packetsInFile;
/** Number of Missing Packets per buffer*/
uint32_t numMissingPackets;
/** Total Number of Missing Packets in acquisition*/
uint32_t numTotMissingPackets;
/** Number of Missing Packets in file */
uint32_t numTotMissingPacketsInFile;
@ -318,15 +344,37 @@ private:
/** Circular fifo to point to address already written and freed, to be reused */
CircularFifo<char>* fifoFree[MAX_NUMBER_OF_LISTENING_THREADS];
/** UDP Sockets - Detector to Receiver */
genericSocket* udpSocket[MAX_NUMBER_OF_LISTENING_THREADS];
/** File Descriptor */
FILE *sfilefd;
/** Number of Jobs Per Buffer */
int numberofJobsPerBuffer;
/** Fifo Depth */
uint32_t fifoSize;
/** Current Frame copied for Gui */
//***receiver to GUI parameters***
/** Current Frame copied for GUI */
char* latestData;
/** If Data to be sent to GUI is ready */
bool guiDataReady;
/** Pointer to data to be sent to GUI */
char* guiData;
/** Pointer to file name to be sent to GUI */
char guiFileName[MAX_STR_LENGTH];
/** Semaphore to synchronize Writer and GuiReader threads*/
sem_t writerGuiSemaphore;
//***general and listening thread parameters***
@ -382,7 +430,6 @@ private:
//***filter parameters***
/** Common Mode Subtraction Enable FIXME: Always false, only moench uses, Ask Anna */
bool commonModeSubtractionEnable;
@ -404,6 +451,12 @@ private:
pthread_mutex_t status_mutex;
//***callback***
/** The action which decides what the user and default responsibilities to save data are
* 0 raw data ready callback takes care of open,close,write file
* 1 callback writes file, we have to open, close it
* 2 we open, close, write file, callback does not do anything */
int cbAction;
@ -422,89 +475,6 @@ private:
/**
* Set receiver type
* @param det detector type
* Returns success or FAIL
*/
int setDetectorType(detectorType det);
//Frame indices and numbers caught
/**
* Returns the frame index at start of entire acquisition (including all scans)
*/
//uint32_t getStartAcquisitionIndex();
/**
* Returns if acquisition started
*/
//bool getAcquistionStarted();
/**
* Returns the frame index at start of each real time acquisition (eg. for each scan)
*/
//uint32_t getStartFrameIndex();
/**
* Returns current Frame Index for each real time acquisition (eg. for each scan)
*/
//uint32_t getFrameIndex();
/**
* Returns if measurement started
*/
//bool getMeasurementStarted();
/**
* Resets the Total Frames Caught
* This is how the receiver differentiates between entire acquisitions
* Returns 0
*/
//void resetTotalFramesCaught();
//other parameters
/**
* abort acquisition with minimum damage: close open files, cleanup.
* does nothing if state already is 'idle'
*/
void abort() {};
/**
* Returns status of receiver: idle, running or error
*/
runStatus getStatus() const;
/**
* Set detector hostname
* @param c hostname
*/
void setDetectorHostname(const char *detectorHostName);
/**
* enable 10Gbe
@param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out
\returns enable for 10Gbe
*/
int enableTenGiga(int enable = -1);
//other functions
/**
* Returns the buffer-current frame read by receiver
* @param c pointer to current file name
@ -520,12 +490,6 @@ private:
*/
void closeFile(int ithr = -1);
/**
* Starts Receiver - starts to listen for packets
* @param message is the error message if there is an error
* Returns success
*/
int startReceiver(char message[]);
/**
* Stops Receiver - stops listening for packets
@ -538,21 +502,9 @@ private:
*/
void startReadout();
/**
* shuts down the udp sockets
* \returns if success or fail
*/
int shutDownUDPSockets();
private:
/*
void not_implemented(string method_name){
std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl;
};
*/
/**
@ -562,12 +514,6 @@ private:
void copyFrameToGui(char* startbuf[], char* buf=NULL);
/**
* initializes variables and creates the first file
* also does the startAcquisitionCallBack
* \returns FAIL or OK
*/
int setupWriter();
/**
* Creates new tree and file for compression
@ -694,42 +640,18 @@ private:
const static uint16_t missingPacketValue = 0xFFFF;
/** UDP Socket between Receiver and Detector */
genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS];
/** Complete File name */
/** Complete File name */
char savefilename[MAX_STR_LENGTH];
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint32_t frameIndex;
/** Pckets currently in current file, starts new file when it reaches max */
uint32_t packetsInFile;
/** Number of missing packets in acquisition*/
uint32_t numTotMissingPackets;
/** Number of missing packets in file (sometimes packetsinFile is incorrect due to padded packets for eiger)*/
uint32_t numTotMissingPacketsInFile;
/** Number of missing packets per buffer*/
uint32_t numMissingPackets;
/** Previous Frame number from buffer */
int prevframenum;
/** gui data ready */
int guiDataReady;
/** points to the data to send to gui */
char* guiData;
/** points to the filename to send to gui */
char* guiFileName;
/** OK if file created was successful */
int ret_createfile;
// TODO: not properly sure where to put these...
/** structure of an eiger image header*/
@ -738,8 +660,7 @@ private:
//semaphores
/** semaphore to synchronize writer and guireader threads */
sem_t smp;
//mutex
/** guiDataReady mutex */
@ -751,8 +672,6 @@ private:
/** mutex for writing data to file */
pthread_mutex_t write_mutex;
/** File Descriptor */
FILE *sfilefd;
//filter
@ -766,11 +685,6 @@ private:
#endif
/** The action which decides what the user and default responsibilites to save data are
* 0 raw data ready callback takes care of open,close,write file
* 1 callback writes file, we have to open, close it
* 2 we open, close, write file, callback does not do anything */
int cbAction;
public:

View File

@ -286,10 +286,7 @@ int UDPBaseImplementation::setDataCompressionEnable(const bool b){
void UDPBaseImplementation::setUDPPortNumber(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(bottomEnable)
udpPortNum[1] = i;
else
udpPortNum[0] = i;
udpPortNum[0] = i;
FILE_LOG(logINFO) << "udpPortNum[0]:" << udpPortNum[0];
}

View File

@ -58,16 +58,22 @@ void UDPStandardImplementation::deleteMembers(){
FILE_LOG(logDEBUG) << __AT__ << " starting";
cout << "Info: Deleting member pointers" << endl;
shutDownUDPSockets();
closeFile();
//filter
deleteFilter();
for(int i=0; i<numberofListeningThreads; i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;}
if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;}
}
if(latestData) {delete[] latestData; latestData = NULL;}
guiData = NULL;
//kill threads
if(threadStarted){
createListeningThreads(true);
createWriterThreads(true);
}
//shutdownudpsockets
//close file
if(latestData) {delete[] latestData; latestData = NULL;}
}
void UDPStandardImplementation::deleteFilter(){
@ -109,15 +115,22 @@ void UDPStandardImplementation::initializeMembers(){
//***file parameters***
maxPacketsPerFile = 0;
fileCreateSuccess = false;
//***acquisition indices parameters***
startAcquisitionIndex = 0;
startFrameIndex = 0;
frameIndex = 0;
currentFrameNumber = 0;
acqStarted = false;
measurementStarted = false;
for(int i = 0; i < numberofListeningThreads; ++i)
totalListeningFrameCount[i] = 0;
packetsInFile = 0;
numMissingPackets = 0;
numTotMissingPackets = 0;
numTotMissingPacketsInFile = 0;
//***receiver parameters***
for(int i=0; i < MAX_NUMBER_OF_LISTENING_THREADS; i++){
@ -125,10 +138,17 @@ void UDPStandardImplementation::initializeMembers(){
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
udpSocket[i] = NULL;
}
sfilefd = NULL;
numberofJobsPerBuffer = -1;
fifoSize = 0;
//***receiver to GUI parameters***
latestData = NULL;
guiDataReady = false;
guiData = NULL;
strcpy(guiFileName,"");
//***general and listening thread parameters***
threadStarted = false;
@ -154,6 +174,9 @@ void UDPStandardImplementation::initializeMembers(){
//***mutex***
pthread_mutex_init(&status_mutex,NULL);
//***callback***
cbAction = DO_EVERYTHING;
}
@ -452,14 +475,106 @@ int UDPStandardImplementation::setupFifoStructure(){
int UDPStandardImplementation::createUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//switching ports if bottom enabled
int port[2];
port = udpPortNum;
if(bottomEnable){
port[0] = udpPortNum[1];
port[1] = udpPortNum[0];
}
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL)
strcpy(eth,"");
shutDownUDPSockets();
//if no eth, listen to all
if(!strlen(eth)){
cout << "Warning: eth is empty. Listening to all"<<endl;
for(int i=0;i<numberofListeningThreads;i++)
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize);
}
//normal socket
else{
cout << "Info: eth:" << eth << endl;
for(int i=0;i<numberofListeningThreads;i++)
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize,eth);
}
//error
for(int i=0;i<numberofListeningThreads;i++){
int iret = udpSocket[i]->getErrorStatus();
if(!iret){
cout << "Info: UDP port opened at port " << port[i] << endl;
}else{
#ifdef VERBOSE
cprintf(BG_RED,"Error: Could not create UDP socket on port %d error: %d\n", port[i], iret);
#endif
shutDownUDPSockets();
return FAIL;
}
}
cout << "Info: UDP socket(s) created successfully." << endl;
cout << "Info: Listener Ready ..." << endl;
return OK;
}
int UDPStandardImplementation::setupWriter(){
FILE_LOG(logDEBUG) << __AT__ << " starting";
//acquisition start call back returns enable write
cbAction = DO_EVERYTHING;
if (startAcquisitionCallBack)
cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition);
if(cbAction < DO_EVERYTHING){
cout << "Info: Call back activated. Data saving must be taken care of by user in call back." << endl;
if (rawDataReadyCallBack)
cout << "Info: Data Write has been defined externally" << endl;
}else if(!fileWriteEnable)
cout << "Info: Data will not be saved" << endl;
//creating first file
//setting all value to 1
pthread_mutex_lock(&status_mutex);
for(int i=0; i<numberofWriterThreads; i++)
createFileMask|=(1<<i);
pthread_mutex_unlock(&status_mutex);
for(int i=0; i<numberofWriterThreads; i++){
FILE_LOG(logDEBUG4) << i << " Going to post 1st semaphore" << endl;
sem_post(&writerSemaphore[i]);
}
//wait till its mask becomes zero(all created)
while(createFileMask){
FILE_LOG(logDEBUG4) << "*" << flush;
usleep(5000);
}
if(dataCompressionEnable){
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(fileCreateSuccess != FAIL)
fileCreateSuccess = createNewFile();
#endif
}
cout << "Info: Successfully created file(s)" << endl;
cout << "Info: Writer Ready ..." << endl;
return fileCreateSuccess;
}
void UDPStandardImplementation::configure(map<string, string> config_map){
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -869,25 +984,113 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){
cout << "Info: Starting Receiver" << endl;
//RESET
//reset measurement variables
measurementStarted = false;
startFrameIndex = 0;
frameIndex = 0;
if(!acqStarted)
currentFrameNumber = 0; //has to be zero to add to startframeindex for each scan
for(int i = 0; i < numberofListeningThreads; ++i)
totalListeningFrameCount[i] = 0;
packetsCaught = 0;
numMissingPackets = 0;
numTotMissingPackets = 0;
numTotMissingPacketsInFile = 0;
//reset file parameters
packetsInFile = 0;
if(sfilefd){
fclose(sfilefd);
sfilefd = NULL;
}
//reset gui variables
guiData = NULL;
guiDataReady=0;
strcpy(guiFileName,"");
//reset masks
pthread_mutex_lock(&status_mutex);
writerThreadsMask = 0x0;
createFileMask = 0x0;
fileCreateSuccess = false;
pthread_mutex_unlock(&status_mutex);
//Print Receiver Configuration
cout << "Info: ***Receiver Configuration***" << endl;
cout << "Info: Max Packets Per File:" << maxPacketsPerFile << endl;
cout << "Info: Data Compression has been " << stringEnable(dataCompressionEnable) << endl;
if(myDetectorType != EIGER)
cout << "Info: Number of Jobs Per Buffer: " << numberofJobsPerBuffer << endl;
if(FrameToGuiFrequency)
cout << "Info: Frequency of frames sent to gui" << FrameToGuiFrequency << endl;
//create UDP sockets
if(createUDPSockets() == FAIL){
strcpy(c,"Could not create UDP Socket(s).\n");
cout << endl;
cout << "Error: "<< c << endl;
return FAIL;
}
if(setupWriter() == FAIL){
//stop udp socket
shutDownUDPSockets();
sprintf(c,"Could not create file %s.\n",savefilename);
cout << endl;
cout << "Error: "<< c << endl;
return FAIL;
}
//For compression, done to give the gui some proper name instead of always the last file name
if(dataCompressionEnable)
sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex);
//initialize semaphore to synchronize between writer and gui reader threads
sem_init(&writerGuiSemaphore,1,0);
//status and thread masks
pthread_mutex_lock(&status_mutex);
status = RUNNING;
for(int i=0;i<numberofListeningThreads;i++)
listeningThreadsMask|=(1<<i);
for(int i=0;i<numberofWriterThreads;i++)
writerThreadsMask|=(1<<i);
pthread_mutex_unlock(&(status_mutex));
//start listening /writing
for(int i=0;i<numberofListeningThreads;i++)
sem_post(&listenSemaphore[i]);
for(int i=0; i < numberofWriterThreads; i++)
sem_post(&writerSemaphore[i]);
//usleep(5000000);
cout << "Info: Receiver Started." << endl;
cout << "Info: Status:" << status << endl;
return OK;
}
int UDPStandardImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
cout << "Info: Shutting down UDP Socket(s)" << endl;
for(int i=0;i<numberofListeningThreads;i++){
if(udpSocket[i]){
udpSocket[i]->ShutDownSocket();
delete udpSocket[i];
udpSocket[i] = NULL;
}
}
return OK;
}
@ -929,16 +1132,13 @@ int UDPStandardImplementation::startReceiver(char *c=NULL){
UDPStandardImplementation::UDPStandardImplementation(){
thread_started = 0;
eth = NULL;
latestData = NULL;
guiFileName = NULL;
tengigaEnable = 0;
for(int i=0;i<MAX_NUM_LISTENING_THREADS;i++){
udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
}
for(int i=0;i<MAX_NUM_WRITER_THREADS;i++){
@ -989,13 +1189,10 @@ void UDPStandardImplementation::initializeMembers(){
frameIndexNeeded = 0;
frameIndex = 0;
packetsCaught = 0;
totalPacketsCaught = 0;
packetsInFile = 0;
numTotMissingPackets = 0;
numTotMissingPacketsInFile = 0;
numMissingPackets = 0;
startAcquisitionIndex = 0;
acquisitionIndex = 0;
@ -1009,7 +1206,7 @@ void UDPStandardImplementation::initializeMembers(){
currframenum = 0;
prevframenum = 0;
guiDataReady = 0;
nFrameToGui = 0;
dataCompression = false;
numListeningThreads = 1;
@ -1017,18 +1214,15 @@ void UDPStandardImplementation::initializeMembers(){
thread_started = 0;
cbAction = DO_EVERYTHING;
tengigaEnable = 0;
for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = NULL;
}
eth = NULL;
guiFileName = NULL;
guiData = NULL;
sfilefd = NULL;
cmSub = NULL;
@ -1043,11 +1237,9 @@ void UDPStandardImplementation::initializeMembers(){
#endif
}
guiFileName = new char[MAX_STR_LENGTH];
eth = new char[MAX_STR_LENGTH];
strcpy(eth,"");
strcpy(detHostname,"");
strcpy(guiFileName,"");
strcpy(savefilename,"");
@ -1096,7 +1288,7 @@ void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ <
shutDownUDPSockets();
if(eth) {delete [] eth; eth = NULL;}
if(latestData) {delete [] latestData; latestData = NULL;}
if(guiFileName) {delete [] guiFileName; guiFileName = NULL;}
for(int i=0;i<numListeningThreads;i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;}
@ -1117,12 +1309,9 @@ void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ <
//uint32_t UDPStandardImplementation::getStartFrameIndex(){return startFrameIndex;}
/*
uint32_t UDPStandardImplementation::getFrameIndex(){
if(!packetsCaught)
frameIndex=-1;
else
acquisitionIndex = currframenum - startAcquisitionIndex
frameIndex = currframenum - startFrameIndex;
return frameIndex;
}
*/
@ -1263,174 +1452,6 @@ cout << "copyframe" << endl;
int UDPStandardImplementation::createUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int port[2];
port[0] = server_port[0];
port[1] = server_port[1];
/** eiger specific */
if(bottom){
port[0] = server_port[1];
port[1] = server_port[0];
}
//if eth is mistaken with ip address
if (strchr(eth,'.')!=NULL)
strcpy(eth,"");
shutDownUDPSockets();
//if no eth, listen to all
if(!strlen(eth)){
cout<<"warning:eth is empty.listening to all"<<endl;
for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize);
}
}
//normal socket
else{
cout<<"eth:"<<eth<<endl;
for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize,eth);
}
}
//error
int iret;
for(int i=0;i<numListeningThreads;i++){
iret = udpSocket[i]->getErrorStatus();
if(!iret){
cout << "UDP port opened at port " << port[i] << endl;
}else{
#ifdef VERBOSE
cprintf(BG_RED,"Could not create UDP socket on port %d error: %d\n", port[i], iret);
#endif
shutDownUDPSockets();
return FAIL;
}
}
return OK;
}
int UDPStandardImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
for(int i=0;i<numListeningThreads;i++){
if(udpSocket[i]){
udpSocket[i]->ShutDownSocket();
delete udpSocket[i];
udpSocket[i] = NULL;
}
}
return OK;
}
int UDPStandardImplementation::setupWriter(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//reset writing thread variables
packetsInFile=0;
numTotMissingPackets = 0;
numTotMissingPacketsInFile = 0;
numMissingPackets = 0;
packetsCaught=0;
frameIndex=0;
if(sfilefd) {cprintf(RED,"**FILE not closed!\n");fclose(sfilefd);sfilefd=NULL;}
guiData = NULL;
guiDataReady=0;
strcpy(guiFileName,"");
cbAction = DO_EVERYTHING;
pthread_mutex_lock(&status_mutex);
writerthreads_mask = 0x0;
createfile_mask = 0x0;
ret_createfile = OK;
pthread_mutex_unlock(&status_mutex);
//printouts
cout << "Max Packets Per File:" << maxPacketsPerFile << endl;
if (rawDataReadyCallBack)
cout << "Note: Data Write has been defined exernally" << endl;
if (dataCompression)
cout << "Data Compression is enabled with " << numJobsPerThread << " number of jobs per thread" << endl;
if(nFrameToGui)
cout << "Sending every " << nFrameToGui << "th frame to gui" << endl;
//acquisition start call back returns enable write
if (startAcquisitionCallBack)
cbAction=startAcquisitionCallBack(filePath,fileName,fileIndex,bufferSize,pStartAcquisition);
if(cbAction < DO_EVERYTHING)
cout << endl << "Note: Call back activated. Data saving must be taken care of by user in call back." << endl;
else if(enableFileWrite==0)
cout << endl << "Note: Data will not be saved" << endl;
//creating first file
//mask
pthread_mutex_lock(&status_mutex);
for(int i=0;i<numWriterThreads;i++)
createfile_mask|=(1<<i);
pthread_mutex_unlock(&status_mutex);
for(int i=0;i<numWriterThreads;i++){
#ifdef VERYDEBUG
cout << i << " gonna post 1st sem" << endl;
#endif
sem_post(&writersmp[i]);
}
//wait till its created
while(createfile_mask){
//cout<<"*"<<flush;
usleep(5000);
}
if (createfile_mask)
cout <<"*********************************************sooo weird:"<<createfile_mask<<endl;
if(dataCompression){
#if (defined(MYROOT1) && defined(ALLFILE_DEBUG)) || !defined(MYROOT1)
if(ret_createfile != FAIL){
int ret = createNewFile();
if(ret == FAIL)
ret_createfile = FAIL;
}
#endif
}
return ret_createfile;
}
int UDPStandardImplementation::createCompressionFile(int ithr, int iframe){
FILE_LOG(logDEBUG) << __AT__ << " called";
@ -1595,78 +1616,6 @@ void UDPStandardImplementation::closeFile(int ithr){
/**
* Pre:
* Post: eiger req. time for 32bit before acq start
* */
int UDPStandardImplementation::startReceiver(char message[]){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
// #ifdef VERBOSE
cout << "Starting Receiver" << endl;
//#endif
//reset listening thread variables
measurementStarted = false;
//should be set to zero as its added to get next start frame indices for scans for eiger
if(!acqStarted) currframenum = 0;
startFrameIndex = 0;
for(int i = 0; i < numListeningThreads; ++i)
totalListeningFrameCount[i] = 0;
//udp socket
if(createUDPSockets() == FAIL){
strcpy(message,"Could not create UDP Socket(s).\n");
cout << endl << message << endl;
return FAIL;
}
cout << "UDP socket(s) created successfully." << endl;
if(setupWriter() == FAIL){
//stop udp socket
shutDownUDPSockets();
sprintf(message,"Could not create file %s.\n",savefilename);
return FAIL;
}
cout << "Successfully created file(s)" << endl;
//done to give the gui some proper name instead of always the last file name
if(dataCompression)
sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex);
//initialize semaphore
sem_init(&smp,1,0);
//status
pthread_mutex_lock(&status_mutex);
status = RUNNING;
for(i=0;i<numListeningThreads;i++)
listeningthreads_mask|=(1<<i);
for(i=0;i<numWriterThreads;i++)
writerthreads_mask|=(1<<i);
pthread_mutex_unlock(&(status_mutex));
//start listening /writing
for(i=0;i<numListeningThreads;i++)
sem_post(&listensmp[i]);
for(i=0; i < numWriterThreads; ++i)
sem_post(&writersmp[i]);
//usleep(5000000);
cout << "Receiver Started.\nStatus:" << status << endl;
return OK;
}
/**
* Pre: status is running, semaphores have been instantiated,