This commit is contained in:
sala 2014-09-22 14:09:31 +02:00
parent e5864d4343
commit f05248cd32
2 changed files with 153 additions and 127 deletions

View File

@ -44,7 +44,9 @@ UDPRESTImplementation::UDPRESTImplementation(){
}
UDPRESTImplementation::~UDPRESTImplementation(){}
UDPRESTImplementation::~UDPRESTImplementation(){
delete rest;
}
void UDPRESTImplementation::configure(map<string, string> config_map){
@ -166,6 +168,10 @@ int UDPRESTImplementation::getFramesCaught(){
int UDPRESTImplementation::getTotalFramesCaught(){
FILE_LOG(logDEBUG) << __AT__ << " called";
if (packetsPerFrame == 0){
FILE_LOG(logWARNING) << __AT__ << " packetsPerFrame is 0!!!";
return 0;
}
return (totalPacketsCaught/packetsPerFrame);
}
@ -216,7 +222,6 @@ int UDPRESTImplementation::getFileIndex(){
int UDPRESTImplementation::setFileIndex(int i){
FILE_LOG(logDEBUG) << __AT__ << " called";
cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl;
if(i>=0)
fileIndex = i;
@ -390,56 +395,12 @@ bool UDPRESTImplementation::getDataCompression(){
}
int UDPRESTImplementation::enableDataCompression(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " called";
cout << "Data compression ";
if(enable)
cout << "enabled" << endl;
else
cout << "disabled" << endl;
#ifdef MYROOT1
cout << " WITH ROOT" << endl;
#else
cout << " WITHOUT ROOT" << endl;
#endif
//delete filter for the current number of threads
deleteFilter();
dataCompression = enable;
pthread_mutex_lock(&status_mutex);
writerthreads_mask = 0x0;
pthread_mutex_unlock(&(status_mutex));
createWriterThreads(true);
if(enable)
numWriterThreads = MAX_NUM_WRITER_THREADS;
else
numWriterThreads = 1;
if(createWriterThreads() == FAIL){
cout << "ERROR: Could not create writer threads" << endl;
return FAIL;
}
setThreadPriorities();
if(enable)
setupFilter();
FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing";
return OK;
}
/*other functions*/
@ -462,6 +423,8 @@ void UDPRESTImplementation::deleteFilter(){
void UDPRESTImplementation::setupFilter(){
//LEO: check
FILE_LOG(logDEBUG) << __AT__ << " called";
double hc = 0;
double sigma = 5;
@ -728,7 +691,6 @@ int UDPRESTImplementation::createUDPSockets(){
int UDPRESTImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << "called";
FILE_LOG(logDEBUG) << __AT__ << "doing nothing";
std::string answer;
int code = rest->get_json("state", &answer);
@ -742,7 +704,7 @@ int UDPRESTImplementation::shutDownUDPSockets(){
code = rest->get_json("state", &answer);
std::cout << answer << std::endl;
status = slsReceiverDefs::IDLE;
status = slsReceiverDefs::RUN_FINISHED;
@ -1744,7 +1706,11 @@ void UDPRESTImplementation::startFrameIndices(int ithread){
}
void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){
FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing";
};
/*
void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){
FILE_LOG(logDEBUG) << __AT__ << " called";
@ -1754,7 +1720,7 @@ int i;
cerr << ithread << " recvfrom() failed:"<<endl;
#endif
if(status != TRANSMITTING){
cout << ithread << " *** shoule never be here********* status not transmitting***********************"<<endl;/**/
cout << ithread << " *** shoule never be here********* status not transmitting***********************"<<endl;
fifoFree[ithread]->push(buffer[ithread]);
exit(-1);
}
@ -1815,7 +1781,7 @@ int i;
}
}
*/

View File

@ -125,7 +125,8 @@ void UDPStandardImplementation::initializeMembers(){
}
UDPStandardImplementation::UDPStandardImplementation(){
UDPStandardImplementation::UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting" ;
@ -182,7 +183,8 @@ UDPStandardImplementation::UDPStandardImplementation(){
UDPStandardImplementation::~UDPStandardImplementation(){
UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called";
createListeningThreads(true);
createWriterThreads(true);
deleteMembers();
@ -191,7 +193,8 @@ UDPStandardImplementation::~UDPStandardImplementation(){
void UDPStandardImplementation::deleteMembers(){
void UDPStandardImplementation::deleteMembers(){ FILE_LOG(logDEBUG) << __AT__ << " called";
//kill threads
if(thread_started){
createListeningThreads(true);
@ -224,7 +227,8 @@ void UDPStandardImplementation::deleteMembers(){
int UDPStandardImplementation::setDetectorType(detectorType det){
int UDPStandardImplementation::setDetectorType(detectorType det){ FILE_LOG(logDEBUG) << __AT__ << " called";
cout << "Setting Receiver Type " << endl;
deleteMembers();
@ -451,6 +455,9 @@ int UDPStandardImplementation::setEnableOverwrite(int i){
/*other parameters*/
slsReceiverDefs::runStatus UDPStandardImplementation::getStatus() const{
FILE_LOG(logDEBUG) << __AT__ << " called, status: " << status;
return status;
}
@ -465,12 +472,14 @@ char *UDPStandardImplementation::getDetectorHostname() const{
return (char*)detHostname;
}
void UDPStandardImplementation::setEthernetInterface(char* c){
void UDPStandardImplementation::setEthernetInterface(char* c){ FILE_LOG(logDEBUG) << __AT__ << " called";
strcpy(eth,c);
}
void UDPStandardImplementation::setUDPPortNo(int p){
void UDPStandardImplementation::setUDPPortNo(int p){ FILE_LOG(logDEBUG) << __AT__ << " called";
for(int i=0;i<numListeningThreads;i++){
server_port[i] = p+i;
}
@ -482,7 +491,8 @@ int UDPStandardImplementation::getNumberOfFrames() const {
}
int32_t UDPStandardImplementation::setNumberOfFrames(int32_t fnum){
int32_t UDPStandardImplementation::setNumberOfFrames(int32_t fnum){ FILE_LOG(logDEBUG) << __AT__ << " called";
if(fnum >= 0)
numberOfFrames = fnum;
@ -494,7 +504,8 @@ int UDPStandardImplementation::getScanTag() const{
}
int32_t UDPStandardImplementation::setScanTag(int32_t stag){
int32_t UDPStandardImplementation::setScanTag(int32_t stag){ FILE_LOG(logDEBUG) << __AT__ << " called";
if(stag >= 0)
scanTag = stag;
@ -506,7 +517,8 @@ int UDPStandardImplementation::getDynamicRange() const{
return dynamicRange;
}
int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){
int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){ FILE_LOG(logDEBUG) << __AT__ << " called";
cout << "Setting Dynamic Range" << endl;
int olddr = dynamicRange;
@ -566,7 +578,8 @@ int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){
int UDPStandardImplementation::setShortFrame(int i){
int UDPStandardImplementation::setShortFrame(int i){ FILE_LOG(logDEBUG) << __AT__ << " called";
shortFrame=i;
if(shortFrame!=-1){
@ -596,7 +609,8 @@ int UDPStandardImplementation::setShortFrame(int i){
}
int UDPStandardImplementation::setNFrameToGui(int i){
int UDPStandardImplementation::setNFrameToGui(int i){ FILE_LOG(logDEBUG) << __AT__ << " called";
if(i>=0){
nFrameToGui = i;
setupFifoStructure();
@ -606,7 +620,8 @@ int UDPStandardImplementation::setNFrameToGui(int i){
int64_t UDPStandardImplementation::setAcquisitionPeriod(int64_t index){
int64_t UDPStandardImplementation::setAcquisitionPeriod(int64_t index){ FILE_LOG(logDEBUG) << __AT__ << " called";
if(index >= 0){
if(index != acquisitionPeriod){
@ -618,9 +633,11 @@ int64_t UDPStandardImplementation::setAcquisitionPeriod(int64_t index){
}
bool UDPStandardImplementation::getDataCompression(){return dataCompression;}
bool UDPStandardImplementation::getDataCompression(){ FILE_LOG(logDEBUG) << __AT__ << " called";
return dataCompression;}
int UDPStandardImplementation::enableDataCompression(bool enable){ FILE_LOG(logDEBUG) << __AT__ << " called";
int UDPStandardImplementation::enableDataCompression(bool enable){
cout << "Data compression ";
if(enable)
cout << "enabled" << endl;
@ -673,7 +690,8 @@ int UDPStandardImplementation::enableDataCompression(bool enable){
/*other functions*/
void UDPStandardImplementation::deleteFilter(){
void UDPStandardImplementation::deleteFilter(){ FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
cmSub=NULL;
@ -691,6 +709,8 @@ void UDPStandardImplementation::deleteFilter(){
void UDPStandardImplementation::setupFilter(){
FILE_LOG(logDEBUG) << __AT__ << " called";
double hc = 0;
double sigma = 5;
int sign = 1;
@ -727,6 +747,7 @@ void UDPStandardImplementation::setupFilter(){
//LEO: it is not clear to me..
void UDPStandardImplementation::setupFifoStructure(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int64_t i;
int oldn = numJobsPerThread;
@ -816,6 +837,8 @@ void UDPStandardImplementation::setupFifoStructure(){
/** acquisition functions */
void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data
if (guiData == NULL)
guiData = latestData;
@ -850,6 +873,8 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
void UDPStandardImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){
FILE_LOG(logDEBUG) << __AT__ << " called";
//random read when gui not ready
if((!nFrameToGui) && (!guiData)){
@ -905,6 +930,7 @@ void UDPStandardImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum,
int UDPStandardImplementation::createUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//if eth is mistaken with ip address
@ -950,6 +976,8 @@ int UDPStandardImplementation::createUDPSockets(){
int UDPStandardImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called";
for(int i=0;i<numListeningThreads;i++){
if(udpSocket[i]){
udpSocket[i]->ShutDownSocket();
@ -965,6 +993,8 @@ int UDPStandardImplementation::shutDownUDPSockets(){
// TODO: add a destroyListeningThreads
int UDPStandardImplementation::createListeningThreads(bool destroy){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
void* status;
@ -1022,6 +1052,8 @@ int UDPStandardImplementation::createListeningThreads(bool destroy){
int UDPStandardImplementation::createWriterThreads(bool destroy){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
void* status;
@ -1083,6 +1115,8 @@ int UDPStandardImplementation::createWriterThreads(bool destroy){
void UDPStandardImplementation::setThreadPriorities(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
//assign priorities
struct sched_param tcp_param, listen_param, write_param;
@ -1120,6 +1154,7 @@ void UDPStandardImplementation::setThreadPriorities(){
int UDPStandardImplementation::setupWriter(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//reset writing thread variables
packetsInFile=0;
@ -1203,6 +1238,8 @@ int UDPStandardImplementation::setupWriter(){
int UDPStandardImplementation::createCompressionFile(int ithr, int iframe){
FILE_LOG(logDEBUG) << __AT__ << " called";
#ifdef MYROOT1
char temp[MAX_STR_LENGTH];
//create file name for gui purposes, and set up acquistion parameters
@ -1231,6 +1268,8 @@ int UDPStandardImplementation::createCompressionFile(int ithr, int iframe){
int UDPStandardImplementation::createNewFile(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int gt = getFrameIndex();
if(gt==-1) gt=0;
//create file name
@ -1295,6 +1334,8 @@ int UDPStandardImplementation::createNewFile(){
void UDPStandardImplementation::closeFile(int ithr){
FILE_LOG(logDEBUG) << __AT__ << " called";
#ifdef VERBOSE
cout << "In closeFile for thread " << ithr << endl;
#endif
@ -1352,6 +1393,8 @@ void UDPStandardImplementation::closeFile(int ithr){
int UDPStandardImplementation::startReceiver(char message[]){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
@ -1420,6 +1463,7 @@ int UDPStandardImplementation::startReceiver(char message[]){
int UDPStandardImplementation::stopReceiver(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//#ifdef VERBOSE
@ -1450,6 +1494,7 @@ int UDPStandardImplementation::stopReceiver(){
void UDPStandardImplementation::startReadout(){
FILE_LOG(logDEBUG) << __AT__ << " called";
//#ifdef VERBOSE
cout << "Start Receiver Readout" << endl;
@ -1473,6 +1518,7 @@ void UDPStandardImplementation::startReadout(){
void* UDPStandardImplementation::startListeningThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called";
((UDPStandardImplementation*)this_pointer)->startListening();
return this_pointer;
@ -1481,6 +1527,7 @@ void* UDPStandardImplementation::startListeningThread(void* this_pointer){
void* UDPStandardImplementation::startWritingThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called";
((UDPStandardImplementation*)this_pointer)->startWriting();
return this_pointer;
}
@ -1491,6 +1538,7 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){
int UDPStandardImplementation::startListening(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int ithread = currentListeningThreadIndex;
#ifdef VERYVERBOSE
cout << "In startListening() " << endl;
@ -1692,6 +1740,8 @@ int UDPStandardImplementation::startListening(){
int UDPStandardImplementation::startWriting(){
FILE_LOG(logDEBUG) << __AT__ << " called";
int ithread = currentWriterThreadIndex;
#ifdef VERYVERBOSE
cout << ithread << "In startWriting()" <<endl;
@ -1885,6 +1935,7 @@ int loop;
void UDPStandardImplementation::startFrameIndices(int ithread){
FILE_LOG(logDEBUG) << __AT__ << " called";
if (myDetectorType == EIGER)
//add currframenum later in this method for scans
@ -1919,6 +1970,9 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
void UDPStandardImplementation::stopListening(int ithread, int rc, int &pc, int &t){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i;
#ifdef VERYVERBOSE
@ -1997,6 +2051,8 @@ int i;
void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
FILE_LOG(logDEBUG) << __AT__ << " called";
int i,j;
#ifdef VERYDEBUG
cout << ithread << " **********************popped last dummy frame:" << (void*)wbuffer[wIndex] << endl;
@ -2064,6 +2120,8 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer[]){
void UDPStandardImplementation::writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum){
FILE_LOG(logDEBUG) << __AT__ << " called";
int packetsToSave, offset,lastpacket;
uint32_t tempframenum = framenum;
@ -2166,6 +2224,7 @@ void UDPStandardImplementation::writeToFile_withoutCompression(char* buf,int num
void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){
FILE_LOG(logDEBUG) << __AT__ << " called";
#if defined(MYROOT1) && defined(ALLFILE_DEBUG)
writeToFile_withoutCompression(wbuf[0], numpackets,currframenum);
@ -2265,6 +2324,7 @@ void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer
int UDPStandardImplementation::enableTenGiga(int enable){
FILE_LOG(logDEBUG) << __AT__ << " called";
cout << "Enabling 10Gbe to" << enable << endl;