beginning of reimplementation of the REST interface

This commit is contained in:
2014-09-09 15:46:58 +02:00
parent 5641101705
commit 59980a4b90
4 changed files with 18 additions and 189 deletions

View File

@ -14,7 +14,7 @@ DFLAGS= -g -DDACS_INT -DSLS_RECEIVER_UDP_FUNCTIONS
INCLUDES?= -I. -Iincludes -IMySocketTCP -IslsReceiver -IslsDetectorCalibration -I$(ASM) INCLUDES?= -I. -Iincludes -IMySocketTCP -IslsReceiver -IslsDetectorCalibration -I$(ASM)
#-IslsReceiverInterface #-IslsReceiverInterface
SRC_CLNT= MySocketTCP/MySocketTCP.cpp slsReceiver/UDPInterface.cpp slsReceiver/UDPBaseImplementation.cpp slsReceiver/UDPStandardImplementation.cpp slsReceiver/slsReceiverTCPIPInterface.cpp slsReceiver/slsReceiver.cpp slsReceiver/slsReceiverUsers.cpp includes/utilities.h SRC_CLNT= MySocketTCP/MySocketTCP.cpp slsReceiver/UDPInterface.cpp slsReceiver/UDPBaseImplementation.cpp slsReceiver/UDPStandardImplementation.cpp slsReceiver/UDPRESTImplementation.cpp slsReceiver/slsReceiverTCPIPInterface.cpp slsReceiver/slsReceiver.cpp slsReceiver/slsReceiverUsers.cpp includes/utilities.h
#slsReceiverInterface/receiverInterface.cpp #slsReceiverInterface/receiverInterface.cpp
#slsReceiver/slsReceiverUDPFunctions.cpp #slsReceiver/slsReceiverUDPFunctions.cpp

View File

@ -29,189 +29,19 @@ using namespace std;
UDPBaseImplementation::UDPBaseImplementation(): UDPBaseImplementation::UDPBaseImplementation(){}
thread_started(0),
eth(NULL),
latestData(NULL),
guiFileName(NULL),
guiFrameNumber(0),
tengigaEnable(0){
for(int i=0;i<MAX_NUM_LISTENING_THREADS;i++){
udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
}
for(int i=0;i<MAX_NUM_WRITER_THREADS;i++){
singlePhotonDet[i] = NULL;
receiverdata[i] = NULL;
}
startAcquisitionCallBack = NULL;
pStartAcquisition = NULL;
acquisitionFinishedCallBack = NULL;
pAcquisitionFinished = NULL;
rawDataReadyCallBack = NULL;
pRawDataReady = NULL;
initializeMembers();
//mutex
pthread_mutex_init(&dataReadyMutex,NULL);
pthread_mutex_init(&status_mutex,NULL);
pthread_mutex_init(&progress_mutex,NULL);
pthread_mutex_init(&write_mutex,NULL);
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000
sysctl -p
// from the manual
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000
*/
}
UDPBaseImplementation::~UDPBaseImplementation(){
createListeningThreads(true);
createWriterThreads(true);
deleteMembers();
}
UDPBaseImplementation::~UDPBaseImplementation(){}
void UDPBaseImplementation::deleteMembers(){ void UDPBaseImplementation::deleteMembers(){
//kill threads FILE_LOG(logWARNING) << "[WARNING] This is a base implementation, " << __func__ << " could have no effects.";
if(thread_started){
createListeningThreads(true);
createWriterThreads(true);
}
for(int i=0;i<numWriterThreads;i++){
if(singlePhotonDet[i]){
delete singlePhotonDet[i];
singlePhotonDet[i] = NULL;
}
if(receiverdata[i]){
delete receiverdata[i];
receiverdata[i] = NULL;
}
}
shutDownUDPSockets();
if(eth) {delete [] eth; eth = NULL;}
if(latestData) {delete [] latestData; latestData = NULL;}
if(guiFileName) {delete [] guiFileName; guiFileName = NULL;}
for(int i=0;i<numListeningThreads;i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;}
if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;}
}
} }
void UDPBaseImplementation::initializeMembers(){ void UDPBaseImplementation::initializeMembers(){
myDetectorType = GENERIC; FILE_LOG(logWARNING) << "[WARNING] This is a base implementation, " << __func__ << " could have no effects.";
maxPacketsPerFile = 0;
enableFileWrite = 1;
overwrite = 1;
fileIndex = 0;
scanTag = 0;
frameIndexNeeded = 0;
acqStarted = false;
measurementStarted = false;
startFrameIndex = 0;
frameIndex = 0;
packetsCaught = 0;
totalPacketsCaught = 0;
packetsInFile = 0;
startAcquisitionIndex = 0;
acquisitionIndex = 0;
packetsPerFrame = 0;
frameIndexMask = 0;
packetIndexMask = 0;
frameIndexOffset = 0;
acquisitionPeriod = SAMPLE_TIME_IN_NS;
numberOfFrames = 0;
dynamicRange = 16;
shortFrame = -1;
currframenum = 0;
prevframenum = 0;
frameSize = 0;
bufferSize = 0;
onePacketSize = 0;
guiDataReady = 0;
nFrameToGui = 0;
fifosize = 0;
numJobsPerThread = -1;
dataCompression = false;
numListeningThreads = 1;
numWriterThreads = 1;
thread_started = 0;
currentListeningThreadIndex = -1;
currentWriterThreadIndex = -1;
for(int i=0;i<numWriterThreads;i++)
totalListeningFrameCount[i] = 0;
listeningthreads_mask = 0x0;
writerthreads_mask = 0x0;
killAllListeningThreads = 0;
killAllWritingThreads = 0;
cbAction = DO_EVERYTHING;
tengigaEnable = 0;
for(int i=0;i<numListeningThreads;i++){
udpSocket[i] = NULL;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
buffer[i] = NULL;
}
eth = NULL;
latestData = NULL;
guiFileName = NULL;
guiData = NULL;
guiFrameNumber = 0;
sfilefd = NULL;
cmSub = NULL;
//diff threads
for(int i=0;i<numWriterThreads;i++){
commonModeSubtractionEnable = false;
singlePhotonDet[i] = NULL;
receiverdata[i] = NULL;
#ifdef MYROOT1
myTree[i] = (NULL);
myFile[i] = (NULL);
#endif
}
guiFileName = new char[MAX_STR_LENGTH];
eth = new char[MAX_STR_LENGTH];
strcpy(eth,"");
strcpy(detHostname,"");
strcpy(guiFileName,"");
strcpy(savefilename,"");
strcpy(filePath,"");
strcpy(fileName,"run");
//status
pthread_mutex_lock(&status_mutex);
status = IDLE;
pthread_mutex_unlock(&(status_mutex));
} }
@ -264,14 +94,11 @@ void UDPBaseImplementation::resetTotalFramesCaught(){
/*file parameters*/ /*file parameters*/
char* UDPBaseImplementation::getFilePath() const{ char* UDPBaseImplementation::getFilePath() const{
FILE_LOG(logWARNING) << "[WARNING] This is a base implementation, " << __func__ << " could have no effects.x" << endl; FILE_LOG(logWARNING) << "[WARNING] This is a base implementation, " << __func__ << " could have no effects.";
return (char*)filePath; return (char*)filePath;
} }
char* UDPBaseImplementation::setFilePath(const char c[]){ char* UDPBaseImplementation::setFilePath(const char c[]){
cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl;
/*
if(strlen(c)){ if(strlen(c)){
//check if filepath exists //check if filepath exists
struct stat st; struct stat st;
@ -279,10 +106,9 @@ char* UDPBaseImplementation::setFilePath(const char c[]){
strcpy(filePath,c); strcpy(filePath,c);
else{ else{
strcpy(filePath,""); strcpy(filePath,"");
cout << "FilePath does not exist:" << filePath << endl; FILE_LOG(logWARNING) << "FilePath does not exist:" << filePath;
} }
} }
*/
return getFilePath(); return getFilePath();
} }
@ -292,11 +118,11 @@ char* UDPBaseImplementation::getFileName() const{
} }
char* UDPBaseImplementation::setFileName(const char c[]){ char* UDPBaseImplementation::setFileName(const char c[]){
cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; //cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl;
/*
if(strlen(c)) if(strlen(c))
strcpy(fileName,c); strcpy(fileName,c);
*/
return getFileName(); return getFileName();
} }
@ -385,6 +211,7 @@ int32_t UDPBaseImplementation::setNumberOfFrames(int32_t fnum){
return getNumberOfFrames(); return getNumberOfFrames();
} }
int UDPBaseImplementation::getScanTag() const{ int UDPBaseImplementation::getScanTag() const{
return scanTag; return scanTag;
} }

View File

@ -30,12 +30,14 @@ using namespace std;
UDPInterface * UDPInterface::create(string receiver_type){ UDPInterface * UDPInterface::create(string receiver_type){
if (receiver_type == "standard") if (receiver_type == "standard"){
cout << "Starting " << receiver_type << endl;
return new UDPStandardImplementation(); return new UDPStandardImplementation();
}
//else if (receiver_type == "REST") //else if (receiver_type == "REST")
// return new UDPRESTImplementation(); // return new UDPRESTImplementation();
else{ else{
cout << "[ERROR] UDP interface not supported, using standard implementation" << endl; FILE_LOG(logWARNING) << "[ERROR] UDP interface not supported, using standard implementation";
return new UDPBaseImplementation(); return new UDPBaseImplementation();
} }
} }

View File

@ -104,7 +104,7 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){
} }
if (success==OK){ if (success==OK){
cout << "SLS Receiver starting" << endl; cout << "SLS Receiver starting " << udp_interface_type << endl;
udp_interface = UDPInterface::create(udp_interface_type); udp_interface = UDPInterface::create(udp_interface_type);
tcpipInterface = new slsReceiverTCPIPInterface(success, udp_interface, tcpip_port_no); tcpipInterface = new slsReceiverTCPIPInterface(success, udp_interface, tcpip_port_no);
//tcp ip interface //tcp ip interface