From c62594c7abb4892efffe30f13835ea0c00fc473e Mon Sep 17 00:00:00 2001 From: Sala Leonardo Date: Tue, 9 Sep 2014 16:38:27 +0200 Subject: [PATCH] added REST and Standard implementations --- slsReceiverSoftware/slsReceiver/RestHelper.h | 295 +++ .../slsReceiver/UDPRESTImplementation.cpp | 2009 ++++++++++++++ .../slsReceiver/UDPRESTImplementation.h | 819 ++++++ .../slsReceiver/UDPStandardImplementation.cpp | 2330 +++++++++++++++++ .../slsReceiver/UDPStandardImplementation.h | 810 ++++++ 5 files changed, 6263 insertions(+) create mode 100644 slsReceiverSoftware/slsReceiver/RestHelper.h create mode 100644 slsReceiverSoftware/slsReceiver/UDPRESTImplementation.cpp create mode 100644 slsReceiverSoftware/slsReceiver/UDPRESTImplementation.h create mode 100644 slsReceiverSoftware/slsReceiver/UDPStandardImplementation.cpp create mode 100644 slsReceiverSoftware/slsReceiver/UDPStandardImplementation.h diff --git a/slsReceiverSoftware/slsReceiver/RestHelper.h b/slsReceiverSoftware/slsReceiver/RestHelper.h new file mode 100644 index 000000000..fc4056c43 --- /dev/null +++ b/slsReceiverSoftware/slsReceiver/RestHelper.h @@ -0,0 +1,295 @@ +/** + * @file RestHelper.h + * @author Leonardo Sala + * @date Tue Mar 25 09:28:19 2014 + * + * @brief + * + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "JsonBox/Value.h" + +#include +#include +#include +#include + +#define EIGER_DEBUG +#ifdef EIGER_DEBUG +#define DEBUG(x) do { std::cerr << "[DEBUG] " << x << std::endl; } while (0) +#else +#define DEBUG(x) +#endif + + +using namespace Poco::Net; +using namespace Poco; +using namespace std; + +class RestHelper { + public: + + RestHelper(int timeout=10, int n_tries=3){ + /** + * + * + * @param timeout default=10 + * @param n_tries default=3 + */ + + http_timeout = timeout; + n_connection_tries = n_tries; + } + + ~RestHelper(){}; + + + void set_connection_params(int timeout, int n_tries){ + http_timeout = timeout; + n_connection_tries = n_tries; + } + + + void get_connection_params(int *timeout, int *n_tries){ + *timeout = http_timeout; + *n_tries = n_connection_tries; + + } + + + void init(string hostname, int port){ + /** Initialize the RestHelper. Hostname and port parameters are not supposed to change. + * + * + * @param hostname FQDN of the host to connect to , e.g. www.iamfake.org, or sodoi.org + * @param port + * + * @return + */ + + //Check for http:// string + string proto_str = "http://"; + if( size_t found = hostname.find(proto_str) != string::npos ){ + char c1[hostname.size()-found-1]; + size_t length1 = hostname.copy(c1, hostname.size()-found-1, proto_str.size()); + c1[length1]='\0'; + hostname = c1; + } + + full_hostname = "http://"+hostname; + session = new HTTPClientSession(hostname,port ); + session->setKeepAliveTimeout( Timespan( http_timeout,0) ); + + }; + + + void init(string hostname_port){ + /** Initialize the RestHelper. Hostname_port parameters are not supposed to change. + * + * + * @param hostname FQDN and port of the host to connect to , e.g. www.iamfake.org:8080, or sodoi.org:1111. Default port is 8080 + * + * @return + */ + + //Check for http:// string + string proto_str = "http://"; + if( size_t found = hostname_port.find(proto_str) != string::npos ){ + char c1[hostname_port.size()-found-1]; + size_t length1 = hostname_port.copy(c1, hostname_port.size()-found-1, proto_str.size()); + c1[length1]='\0'; + hostname_port = c1; + } + + size_t found = hostname_port.rfind(":"); + char c1[ found ], c2[hostname_port.size()-found-1]; + string hostname; + size_t length1 = hostname_port.copy(c1, found); + + c1[length1]='\0'; + hostname = c1; + size_t length2 = hostname_port.copy(c2, found-1, found+1); + c2[length2]='\0'; + int port = atoi(c2); + + full_hostname = proto_str+hostname; + session = new HTTPClientSession(hostname,port ); + session->setKeepAliveTimeout( Timespan( http_timeout,0) ); + }; + + + int get_json(string request, string* answer){ + /** Retrieves a reply from the RESTful webservice. + * + * + * @param request Request without the hostname, e.g. if the full request would have been http://fake.org/fakemethod, request=fakemethod + * @param answer + * + * @return 0 if successful, -1 if failure happens. + */ + URI * uri = new URI(full_hostname+"/"+request); + string path(uri->getPathAndQuery()); + if (path.empty()) path = "/"; + + // send request + HTTPRequest req(HTTPRequest::HTTP_GET, path, HTTPMessage::HTTP_1_1); + req.setContentType("application/json\r\n"); + int code = send_request(session, req, answer); + delete uri; + return code; + }; + + + int get_json(string request, JsonBox::Value* json_value){ + /** + * + * + * @param request + * @param json_value + * + * @return + */ + URI *uri = new URI(full_hostname+"/"+request); + string path(uri->getPathAndQuery()); + if (path.empty()) path = "/"; + // send request + HTTPRequest req(HTTPRequest::HTTP_GET, path, HTTPMessage::HTTP_1_1); + req.setContentType("application/json\r\n"); + string answer; + int code = send_request(session, req, &answer); + if(code == 0 ) { + DEBUG("ANSWER " << answer ); + json_value->loadFromString(answer); + } + delete uri; + return code; + }; + + + int post_json(string request, string *answer, string request_body=""){ + /** + * + * + * @param request + * @param answer + * @param request_body Eventual arguments to the URL, e.g. action=login&name=mammamia + * + * @return + */ + //from: http://stackoverflow.com/questions/1499086/poco-c-net-ssl-how-to-post-https-request + URI *uri = new URI(full_hostname+"/"+request); + string path(uri->getPathAndQuery()); + if (path.empty()) path = "/"; + HTTPRequest req(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1 ); + req.setContentType("application/json\r\n"); + req.setContentLength( request.length() ); + + int code = send_request(session, req, answer, request_body); + delete uri; + return code; + } + + + int post_json(string request, JsonBox::Value* json_value, string request_body=""){ + /** + * + * + * @param request + * @param json_value + * @param request_body Eventual arguments to the URL, e.g. action=login&name=mammamia + * + * @return + */ + + URI *uri = new URI(full_hostname+"/"+request); + string path(uri->getPathAndQuery()); + if (path.empty()) path = "/"; + HTTPRequest req(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1 ); + //this does not work + //req.setContentType("application/json\r\n"); + //req.setContentLength( request.length() ); + string answer; + int code = send_request(session, req, &answer, request_body); + if(code==0){ + json_value->loadFromString(answer); + } + delete uri; + return code; + } + + + private: + + HTTPClientSession *session; + string full_hostname; + /// HTTP timeout in seconds, default is 8 + int http_timeout; + /// Number of connection tries + int n_connection_tries; + + + int send_request(HTTPClientSession *session, HTTPRequest &req, string *answer, string request_body=""){ + /** + * + * + * @param session + * @param req + * @param answer + * @param request_body + * + * @return + */ + + int n=0; + int code = -1; + while(nsendRequest( (req) ); + else{ + cout << request_body << endl; + ostream &os = session->sendRequest( req ) ; + os << request_body; + } + + HTTPResponse res; + istream &is = session->receiveResponse(res); + StreamCopier::copyToString(is, *answer); + code = res.getStatus(); + if (code != 200){ + cout << "HTTP ERROR " << res.getStatus() << ": " << res.getReason() << endl; + code = -1; + } + else + code = 0; + return code; + } + catch (exception& e){ + cout << "Exception connecting to "<< full_hostname << ": "<< e.what() << ", sleeping 5 seconds (" << n << "/"< // SIGINT +#include // stat +#include // socket(), bind(), listen(), accept(), shut down +#include // sock_addr_in, htonl, INADDR_ANY +#include // exit() +#include //set precision +#include //munmap + +#include +#include + +//#include "utilities.h" + +using namespace std; + + + +UDPRESTImplementation::UDPRESTImplementation() : isInitialized(false), status(slsReceiverDefs::ERROR) {} + + +UDPRESTImplementation::~UDPRESTImplementation(){} + + +void UDPRESTImplementation::initialize(const char *detectorHostName){ + + string name; + if (detectorHostName != NULL) + name = detectorHostName; + + if (name.empty()) { + FILE_LOG(logDEBUG) << "initialize(): can't initialize with empty string or NULL for detectorHostname"; + } else if (isInitialized == true) { + FILE_LOG(logDEBUG) << "initialize(): already initialized, can't initialize several times"; + } else { + FILE_LOG(logDEBUG) << "initialize(): initialize() with: detectorHostName=" << name; + strcpy(detHostname,detectorHostName); + //init_config.detectorHostname = name; + + //REST call - hardcoded + //RestHelper rest ; + rest->init(detHostname, 8080); + std::string answer; + int code = rest->get_json("status", &answer); + if (code != 0){ + //throw -1; + std::cout << "I SHOULD THROW AN EXCEPTION!!!" << std::endl; + } + else{ + isInitialized = true; + status = slsReceiverDefs::IDLE; + } + std::cout << "Answer: " << answer << std::endl; + + + /* + std::std::cout << string << std::endl; << "---- REST test 3: true, json object "<< std::endl; + JsonBox::Value json_value; + code = rest.get_json("status", &json_value); + std::cout << "JSON " << json_value["status"] << std::endl; + */ + } +} + + +int UDPRESTImplementation::setDetectorType(detectorType det){ + cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl; + return OK; +} + + + +/*Frame indices and numbers caught*/ + +bool UDPRESTImplementation::getAcquistionStarted(){return acqStarted;}; + +bool UDPRESTImplementation::getMeasurementStarted(){return measurementStarted;}; + +int UDPRESTImplementation::getFramesCaught(){return (packetsCaught/packetsPerFrame);} + +int UDPRESTImplementation::getTotalFramesCaught(){return (totalPacketsCaught/packetsPerFrame);} + +uint32_t UDPRESTImplementation::getStartFrameIndex(){return startFrameIndex;} + +uint32_t UDPRESTImplementation::getFrameIndex(){ + if(!packetsCaught) + frameIndex=-1; + else + frameIndex = currframenum - startFrameIndex; + return frameIndex; +} + + +uint32_t UDPRESTImplementation::getAcquisitionIndex(){ + if(!totalPacketsCaught) + acquisitionIndex=-1; + else + acquisitionIndex = currframenum - startAcquisitionIndex; + return acquisitionIndex; +} + + +void UDPRESTImplementation::resetTotalFramesCaught(){ + acqStarted = false; + startAcquisitionIndex = 0; + totalPacketsCaught = 0; +} + + +/*file parameters*/ +int UDPRESTImplementation::getFileIndex(){ + return fileIndex; +} + +int UDPRESTImplementation::setFileIndex(int i){ + cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; + /* + if(i>=0) + fileIndex = i; + */ + return getFileIndex(); +} + + +int UDPRESTImplementation::setFrameIndexNeeded(int i){ + cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; + frameIndexNeeded = i; + return frameIndexNeeded; +} + + +/* +int UDPRESTImplementation::getEnableFileWrite() const{ + return enableFileWrite; +} + +int UDPRESTImplementation::setEnableFileWrite(int i){ + enableFileWrite=i; + return getEnableFileWrite(); +} + +int UDPRESTImplementation::getEnableOverwrite() const{ + return overwrite; +} + +int UDPRESTImplementation::setEnableOverwrite(int i){ + overwrite=i; + return getEnableOverwrite(); +} +*/ + + + + +/*other parameters*/ + +slsReceiverDefs::runStatus UDPRESTImplementation::getStatus() const{ + return status; +} + + + +/* +char *UDPRESTImplementation::getDetectorHostname() const{ + return (char*)detHostname; +} +*/ + +void UDPRESTImplementation::setEthernetInterface(char* c){ + strcpy(eth,c); +} + + +void UDPRESTImplementation::setUDPPortNo(int p){ + for(int i=0;i= 0) + numberOfFrames = fnum; + + return getNumberOfFrames(); +} +*/ +/* +int UDPRESTImplementation::getScanTag() const{ + return scanTag; +} +*/ + +/* +int32_t UDPRESTImplementation::setScanTag(int32_t stag){ + if(stag >= 0) + scanTag = stag; + + return getScanTag(); +} +*/ + +int32_t UDPRESTImplementation::setDynamicRange(int32_t dr){ + cout << "Setting Dynamic Range" << endl; + + int olddr = dynamicRange; + if(dr >= 0){ + dynamicRange = dr; + } + + return getDynamicRange(); +} + + + +int UDPRESTImplementation::setShortFrame(int i){ + shortFrame=i; + + if(shortFrame!=-1){ + bufferSize = GOTTHARD_SHORT_ONE_PACKET_SIZE; + frameSize = GOTTHARD_SHORT_BUFFER_SIZE; + maxPacketsPerFile = SHORT_MAX_FRAMES_PER_FILE * GOTTHARD_SHORT_PACKETS_PER_FRAME; + packetsPerFrame = GOTTHARD_SHORT_PACKETS_PER_FRAME; + frameIndexMask = GOTTHARD_SHORT_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_SHORT_FRAME_INDEX_OFFSET; + + }else{ + onePacketSize = GOTTHARD_ONE_PACKET_SIZE; + bufferSize = GOTTHARD_BUFFER_SIZE; + frameSize = GOTTHARD_BUFFER_SIZE; + maxPacketsPerFile = MAX_FRAMES_PER_FILE * GOTTHARD_PACKETS_PER_FRAME; + packetsPerFrame = GOTTHARD_PACKETS_PER_FRAME; + frameIndexMask = GOTTHARD_FRAME_INDEX_MASK; + frameIndexOffset = GOTTHARD_FRAME_INDEX_OFFSET; + } + + + deleteFilter(); + if(dataCompression) + setupFilter(); + + return shortFrame; +} + + +int UDPRESTImplementation::setNFrameToGui(int i){ + if(i>=0){ + nFrameToGui = i; + setupFifoStructure(); + } + return nFrameToGui; +} + + + +int64_t UDPRESTImplementation::setAcquisitionPeriod(int64_t index){ + + if(index >= 0){ + if(index != acquisitionPeriod){ + acquisitionPeriod = index; + setupFifoStructure(); + } + } + return acquisitionPeriod; +} + + +bool UDPRESTImplementation::getDataCompression(){return dataCompression;} + +int UDPRESTImplementation::enableDataCompression(bool enable){ + cout << "Data compression "; + if(enable) + cout << "enabled" << endl; + else + cout << "disabled" << endl; +#ifdef MYROOT1 + cout << " WITH ROOT" << endl; +#else + cout << " WITHOUT ROOT" << endl; +#endif + //delete filter for the current number of threads + deleteFilter(); + + dataCompression = enable; + pthread_mutex_lock(&status_mutex); + writerthreads_mask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + createWriterThreads(true); + + if(enable) + numWriterThreads = MAX_NUM_WRITER_THREADS; + else + numWriterThreads = 1; + + if(createWriterThreads() == FAIL){ + cout << "ERROR: Could not create writer threads" << endl; + return FAIL; + } + setThreadPriorities(); + + + if(enable) + setupFilter(); + + return OK; +} + + + + + + + + + + + + +/*other functions*/ + + +void UDPRESTImplementation::deleteFilter(){ + int i; + cmSub=NULL; + + for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); + +} + + + +//LEO: it is not clear to me.. +void UDPRESTImplementation::setupFifoStructure(){ + + int64_t i; + int oldn = numJobsPerThread; + + //if every nth frame mode + if(nFrameToGui) + numJobsPerThread = nFrameToGui; + + //random nth frame mode + else{ + if(!acquisitionPeriod) + i = SAMPLE_TIME_IN_NS; + else + i = SAMPLE_TIME_IN_NS/acquisitionPeriod; + if (i > MAX_JOBS_PER_THREAD) + numJobsPerThread = MAX_JOBS_PER_THREAD; + else if (i < 1) + numJobsPerThread = 1; + else + numJobsPerThread = i; + } + + //if same, return + if(oldn == numJobsPerThread) + return; + + if(myDetectorType == EIGER) + numJobsPerThread = 1; + + //otherwise memory too much if numjobsperthread is at max = 1000 + fifosize = GOTTHARD_FIFO_SIZE; + if(myDetectorType == MOENCH) + fifosize = MOENCH_FIFO_SIZE; + else if(myDetectorType == EIGER) + fifosize = EIGER_FIFO_SIZE; + + if(fifosize % numJobsPerThread) + fifosize = (fifosize/numJobsPerThread)+1; + else + fifosize = fifosize/numJobsPerThread; + + + cout << "Number of Frames per buffer:" << numJobsPerThread << endl; + cout << "Fifo Size:" << fifosize << endl; + + /* + //for testing + numJobsPerThread = 3; fifosize = 11; + */ + + for(int i=0;iisEmpty()) + fifoFree[i]->pop(buffer[i]); + delete fifoFree[i]; + } + if(fifo[i]) delete fifo[i]; + if(mem0[i]) free(mem0[i]); + fifoFree[i] = new CircularFifo(fifosize); + fifo[i] = new CircularFifo(fifosize); + + + //allocate memory + mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + /** shud let the client know about this */ + if (mem0[i]==NULL){ + cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; + exit(-1); + } + buffer[i]=mem0[i]; + //push the addresses into freed fifoFree and writingFifoFree + while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + fifoFree[i]->push(buffer[i]); + buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + } + } + cout << "Fifo structure(s) reconstructed" << endl; +} + + + + + + + +/** acquisition functions */ + +void UDPRESTImplementation::readFrame(char* c,char** raw, uint32_t &fnum){ + //point to gui data + if (guiData == NULL) + guiData = latestData; + + //copy data and filename + strcpy(c,guiFileName); + fnum = guiFrameNumber; + + + //could not get gui data + if(!guiDataReady){ + *raw = NULL; + } + //data ready, set guidata to receive new data + else{ + *raw = guiData; + guiData = NULL; + + pthread_mutex_lock(&dataReadyMutex); + guiDataReady = 0; + pthread_mutex_unlock(&dataReadyMutex); + if((nFrameToGui) && (writerthreads_mask)){ + /*if(nFrameToGui){*/ + //release after getting data + sem_post(&smp); + } + } +} + + + + + +void UDPRESTImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ + + //random read when gui not ready + if((!nFrameToGui) && (!guiData)){ + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + pthread_mutex_unlock(&dataReadyMutex); + } + + //random read or nth frame read, gui needs data now + else{ + /* + //nth frame read, block current process if the guireader hasnt read it yet + if(nFrameToGui) + sem_wait(&smp); +*/ + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + //eiger + if(startbuf != NULL){ + int offset = 0; + int size = frameSize/EIGER_MAX_PORTS; + for(int j=0;jgetErrorStatus(); + if(iret){ +#ifdef VERBOSE + cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; +#endif + return FAIL; + } + } + + return OK; +} + + + + + + + +int UDPRESTImplementation::shutDownUDPSockets(){ + for(int i=0;iShutDownSocket(); + delete udpSocket[i]; + udpSocket[i] = NULL; + } + } + return OK; +} + + + + + +int UDPRESTImplementation::createListeningThreads(bool destroy){ + int i; + void* status; + + killAllListeningThreads = 0; + + pthread_mutex_lock(&status_mutex); + listeningthreads_mask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + if(!destroy){ + + //start listening threads + cout << "Creating Listening Threads(s)"; + + currentListeningThreadIndex = -1; + + for(i = 0; i < numListeningThreads; ++i){ + sem_init(&listensmp[i],1,0); + thread_started = 0; + currentListeningThreadIndex = i; + if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ + cout << "Could not create listening thread with index " << i << endl; + return FAIL; + } + while(!thread_started); + cout << "."; + cout << flush; + } +#ifdef VERBOSE + cout << "Listening thread(s) created successfully." << endl; +#else + cout << endl; +#endif + }else{ + cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); + //resets the pedestalSubtraction array and the commonModeSubtraction + singlePhotonDet[ithr]->newDataSet(); + if(myFile[ithr]==NULL){ + cout<<"file null"<IsOpen()){ + cout<<"file not open"< DO_NOTHING){ + //close + if(sfilefd){ + fclose(sfilefd); + sfilefd = NULL; + } + //open file + if(!overwrite){ + if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ + cout << "Error: Could not create new file " << savefilename << endl; + return FAIL; + } + }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ + cout << "Error: Could not create file " << savefilename << endl; + return FAIL; + } + //setting buffer + setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); + + //printing packet losses and file names + if(!packetsCaught) + cout << savefilename << endl; + else{ + cout << savefilename + << "\tpacket loss " + << setw(4)<GetCurrentFile(); + + if(myFile[ithr]->Write()) + //->Write(tall->GetName(),TObject::kOverwrite); + cout << "Thread " << ithr <<": wrote frames to file" << endl; + else + cout << "Thread " << ithr << ": could not write frames to file" << endl; + + }else + cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; + //close file + if(myTree[ithr] && myFile[ithr]) + myFile[ithr] = myTree[ithr]->GetCurrentFile(); + if(myFile[ithr] != NULL) + myFile[ithr]->Close(); + myFile[ithr] = NULL; + myTree[ithr] = NULL; + pthread_mutex_unlock(&write_mutex); + +#endif + } +} + + + + + +int UDPRESTImplementation::startReceiver(char message[]){ + int i; + + +// #ifdef VERBOSE + cout << "Starting Receiver" << endl; +//#endif + + + //reset listening thread variables + measurementStarted = false; + //should be set to zero as its added to get next start frame indices for scans for eiger + if(!acqStarted) currframenum = 0; + startFrameIndex = 0; + + for(int i = 0; i < numListeningThreads; ++i) + totalListeningFrameCount[i] = 0; + + //udp socket + if(createUDPSockets() == FAIL){ + strcpy(message,"Could not create UDP Socket(s).\n"); + cout << endl << message << endl; + return FAIL; + } + cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; + + + if(setupWriter() == FAIL){ + //stop udp socket + shutDownUDPSockets(); + + sprintf(message,"Could not create file %s.\n",savefilename); + return FAIL; + } + cout << "Successfully created file(s)" << endl; + + //done to give the gui some proper name instead of always the last file name + if(dataCompression) + sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); + + //initialize semaphore + sem_init(&smp,1,0); + + //status + pthread_mutex_lock(&status_mutex); + status = RUNNING; + for(i=0;istartListening(); + + return this_pointer; +} + + + +void* UDPRESTImplementation::startWritingThread(void* this_pointer){ + ((UDPRESTImplementation*)this_pointer)->startWriting(); + return this_pointer; +} + + + + + + +int UDPRESTImplementation::startListening(){ + int ithread = currentListeningThreadIndex; +#ifdef VERYVERBOSE + cout << "In startListening() " << endl; +#endif + + thread_started = 1; + + int i,total; + int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize; + uint32_t lastframeheader;// for moench to check for all the packets in last frame + char* tempchar = NULL; + int imageheader = 0; + if(myDetectorType==EIGER) + imageheader = EIGER_IMAGE_HEADER_SIZE; + + + while(1){ + //variables that need to be checked/set before each acquisition + carryonBufferSize = 0; + //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later + maxBufferSize = bufferSize * numJobsPerThread; +#ifdef VERYDEBUG + cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; +#endif + + if(tempchar) {delete [] tempchar;tempchar = NULL;} + if(myDetectorType != EIGER) + tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size + + + while((1<pop(buffer[ithread]); +#ifdef VERYDEBUG + cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; +#endif + + + //receive + if(udpSocket[ithread] == NULL){ + rc = 0; + cout << ithread << "UDP Socket is NULL" << endl; + } + //normal listening + else if(!carryonBufferSize){ + + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); + expected = maxBufferSize; + + } + //the remaining packets from previous buffer + else{ +#ifdef VERYDEBUG + cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; + cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) + & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); + expected = maxBufferSize - carryonBufferSize; + } + +#ifdef VERYDEBUG + cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; +#endif + + + + + //start indices for each start of scan/acquisition - eiger does it before + if((!measurementStarted) && (rc > 0) && (!ithread)) + startFrameIndices(ithread); + + //problem in receiving or end of acquisition + if((rc < expected)||(rc <= 0)){ + stopListening(ithread,rc,packetcount,total); + continue; + } + + + + //reset + packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; + carryonBufferSize = 0; + + + + //check if last packet valid and calculate packet count + switch(myDetectorType){ + + case MOENCH: + lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; + cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; + cout << "last packet offset:" << lastpacketoffset << endl; + cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; + cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + //moench last packet value is 0 + if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ + lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; + carryonBufferSize += onePacketSize; + lastpacketoffset -= onePacketSize; + --packetcount; + while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ + carryonBufferSize += onePacketSize; + lastpacketoffset -= onePacketSize; + --packetcount; + } + memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) + & (frameIndexMask)) >> frameIndexOffset) << endl; + cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) + & (packetIndexMask)) << endl; +#endif + } + break; + + case GOTTHARD: + if(shortFrame == -1){ + lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout << "last packet offset:" << lastpacketoffset << endl; +#endif + + if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ + memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + carryonBufferSize = onePacketSize; + --packetcount; + } + } +#ifdef VERYDEBUG + cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + break; + default: + + break; + + } + + + // cout<<"*********** "<fnum)<push(buffer[ithread])); +#ifdef VERYDEBUG + if(!ithread) cout << ithread << " *** pushed into listening fifo" << endl; +#endif + } + + sem_wait(&listensmp[ithread]); + + //make sure its not exiting thread + if(killAllListeningThreads){ + cout << ithread << " good bye listening thread" << endl; + if(tempchar) {delete [] tempchar;tempchar = NULL;} + pthread_exit(NULL); + } + } + + return OK; +} + + + + + + + + + + + + + +int UDPRESTImplementation::startWriting(){ + int ithread = currentWriterThreadIndex; +#ifdef VERYVERBOSE + cout << ithread << "In startWriting()" <pop(wbuf[i]); + numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); +#ifdef VERYDEBUG + cout << ithread << " numpackets:" << dec << numpackets << endl; +#endif + } + +#ifdef VERYDEBUG + cout << ithread << " numpackets:" << dec << numpackets << endl; + cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl; + cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl; +#endif + + + //last dummy packet + if(numpackets == 0xFFFF){ + stopWriting(ithread,wbuf); + continue; + } + + + + + //for progress + if(myDetectorType == EIGER){ + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); + tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 + }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + pthread_mutex_lock(&progress_mutex); + if(tempframenum > currframenum) + currframenum = tempframenum; + pthread_mutex_unlock(&progress_mutex); + } +//#ifdef VERYDEBUG + if(myDetectorType == EIGER) + cout << endl < 0){ + for(i=0;ipush(wbuf[i])); +#ifdef VERYDEBUG + cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl; +#endif + } + + + } + else{ + //copy to gui + copyFrameToGui(NULL,-1,wbuf[0]+HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYVERBOSE + cout << ithread << " finished copying" << endl; +#endif + while(!fifoFree[0]->push(wbuf[0])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuf[0]<fnum); + //gotthard has +1 for frame number and not a short frame + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset); + else + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) + & (frameIndexMask)) >> frameIndexOffset); + + + //start of acquisition + if(!acqStarted){ + startAcquisitionIndex=startFrameIndex; + currframenum = startAcquisitionIndex; + acqStarted = true; + cout << "startAcquisitionIndex:" << startAcquisitionIndex<push(buffer[ithread]); + exit(-1); + } + //push the last buffer into fifo + if(rc > 0){ + pc = (rc/onePacketSize); +#ifdef VERYDEBUG + cout << ithread << " *** last packetcount:" << pc << endl; +#endif + (*((uint16_t*)(buffer[ithread]))) = pc; + totalListeningFrameCount[ithread] += pc; + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; +#endif + } + + + //push dummy buffer to all writer threads + for(i=0;ipop(buffer[ithread]); + (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; +#ifdef VERYDEBUG + cout << ithread << " going to push in dummy buffer:" << (void*)buffer[ithread] << " with num packets:"<< (*((uint16_t*)(buffer[ithread]))) << endl; +#endif + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; +#endif + } + + //reset mask and exit loop + pthread_mutex_lock(&status_mutex); + listeningthreads_mask^=(1< 1) + cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; +#endif + while(listeningthreads_mask) + usleep(5000); +#ifdef VERYDEBUG + t = 0; + for(i=0;ipush(wbuffer[i])); +#ifdef VERYDEBUG + cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl; +#endif + } + + + + //all threads need to close file, reset mask and exit loop + closeFile(ithread); + pthread_mutex_lock(&status_mutex); + writerthreads_mask^=(1< 0){ + + //for progress and packet loss calculation(new files) + if(myDetectorType == EIGER); + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + + //lock + if(numWriterThreads > 1) + pthread_mutex_lock(&write_mutex); + + + //to create new file when max reached + packetsToSave = maxPacketsPerFile - packetsInFile; + if(packetsToSave > numpackets) + packetsToSave = numpackets; +/**next time offset is still plus header length*/ + fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); + packetsInFile += packetsToSave; + packetsCaught += packetsToSave; + totalPacketsCaught += packetsToSave; + + + //new file + if(packetsInFile >= maxPacketsPerFile){ + //for packet loss + lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); + if(myDetectorType == EIGER); + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + //create + createNewFile(); + } + + //unlock + if(numWriterThreads > 1) + pthread_mutex_unlock(&write_mutex); + + + offset += (packetsToSave * onePacketSize); + numpackets -= packetsToSave; + } + + } + else{ + if(numWriterThreads > 1) + pthread_mutex_lock(&write_mutex); + packetsInFile += numpackets; + packetsCaught += numpackets; + totalPacketsCaught += numpackets; + if(numWriterThreads > 1) + pthread_mutex_unlock(&write_mutex); + } +} + + + + + + + + + + + + + + +void UDPRESTImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ + +#if defined(MYROOT1) && defined(ALLFILE_DEBUG) + writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); +#endif + + eventType thisEvent = PEDESTAL; + int ndata; + char* buff = 0; + data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; + int remainingsize = npackets * onePacketSize; + int np; + int once = 0; + double tot, tl, tr, bl, br; + int xmin = 1, ymin = 1, ix, iy; + + + while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ + np = ndata/onePacketSize; + + //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); + + //only for moench + if(commonModeSubtractionEnable){ + for(ix = xmin - 1; ix < xmax+1; ix++){ + for(iy = ymin - 1; iy < ymax+1; iy++){ + thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); + } + } + } + + + for(ix = xmin - 1; ix < xmax+1; ix++) + for(iy = ymin - 1; iy < ymax+1; iy++){ + thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); + if (nf>1000) { + tot=0; + tl=0; + tr=0; + bl=0; + br=0; + if (thisEvent==PHOTON_MAX) { + receiverdata[ithread]->getFrameNumber(buff); + //iFrame=receiverdata[ithread]->getFrameNumber(buff); +#ifdef MYROOT1 + myTree[ithread]->Fill(); + //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; +#else + pthread_mutex_lock(&write_mutex); + if((enableFileWrite) && (sfilefd)) + singlePhotonDet[ithread]->writeCluster(sfilefd); + pthread_mutex_unlock(&write_mutex); +#endif + } + } + } + + nf++; +#ifndef ALLFILE + pthread_mutex_lock(&progress_mutex); + packetsInFile += packetsPerFrame; + packetsCaught += packetsPerFrame; + totalPacketsCaught += packetsPerFrame; + if(packetsInFile >= maxPacketsPerFile) + createNewFile(); + pthread_mutex_unlock(&progress_mutex); + +#endif + if(!once){ + copyFrameToGui(NULL,-1,buff); + once = 1; + } + } + + remainingsize -= ((buff + ndata) - data); + data = buff + ndata; + if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) + cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuffer[0])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuffer[0]<= 0){ + + tengigaEnable = enable; + + if(myDetectorType == EIGER){ + + if(!tengigaEnable){ + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + }else{ + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame*4; + } + frameSize = onePacketSize * packetsPerFrame; + bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) + //maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + + cout<<"packetsPerFrame:"< +#include +#endif + +#include "RestHelper.h" + + +#include +#include +#include +#include + +/** + * @short does all the functions for a receiver, set/get parameters, start/stop etc. + */ + +class UDPRESTImplementation : private virtual slsReceiverDefs, public UDPBaseImplementation { + + public: + /** + * Constructor + */ + UDPRESTImplementation(); + + /** + * Destructor + */ + virtual ~UDPRESTImplementation(); + + + + /** + * delete and free member parameters + */ + void deleteMembers(); + + /** + * initialize member parameters + */ + void initializeMembers(); + + /** + * Set receiver type + * @param det detector type + * Returns success or FAIL + */ + int setDetectorType(detectorType det); + + + //Frame indices and numbers caught + /** + * Returns current Frame Index Caught for an entire acquisition (including all scans) + */ + uint32_t getAcquisitionIndex(); + + /** + * Returns if acquisition started + */ + bool getAcquistionStarted(); + + /** + * Returns Frames Caught for each real time acquisition (eg. for each scan) + */ + int getFramesCaught(); + + /** + * Returns Total Frames Caught for an entire acquisition (including all scans) + */ + int getTotalFramesCaught(); + + /** + * Returns the frame index at start of each real time acquisition (eg. for each scan) + */ + uint32_t getStartFrameIndex(); + + /** + * Returns current Frame Index for each real time acquisition (eg. for each scan) + */ + uint32_t getFrameIndex(); + + /** + * Returns if measurement started + */ + bool getMeasurementStarted(); + + /** + * Resets the Total Frames Caught + * This is how the receiver differentiates between entire acquisitions + * Returns 0 + */ + void resetTotalFramesCaught(); + + + + + //file parameters + /** + * Returns File Path + */ + //char* getFilePath() const; + + /** + * Set File Path + * @param c file path + */ + //char* setFilePath(const char c[]); + + /** + * Returns File Name + */ + //char* getFileName() const; + + /** + * Set File Name (without frame index, file index and extension) + * @param c file name + */ + //char* setFileName(const char c[]); + + /** + * Returns File Index + */ + int getFileIndex(); + + /** + * Set File Index + * @param i file index + */ + int setFileIndex(int i); + + /** + * Set Frame Index Needed + * @param i frame index needed + */ + int setFrameIndexNeeded(int i); + + /** + * Set enable file write + * @param i file write enable + * Returns file write enable + */ + //int setEnableFileWrite(int i); + + /** + * Enable/disable overwrite + * @param i enable + * Returns enable over write + */ + //int setEnableOverwrite(int i); + + /** + * Returns file write enable + * 1: YES 0: NO + */ + //int getEnableFileWrite() const; + + /** + * Returns file over write enable + * 1: YES 0: NO + */ + //int getEnableOverwrite() const; + +//other parameters + + /** + * abort acquisition with minimum damage: close open files, cleanup. + * does nothing if state already is 'idle' + */ + void abort() {}; + + /** + * Returns status of receiver: idle, running or error + */ + runStatus getStatus() const; + + /** + * Set detector hostname + * @param c hostname + */ + void initialize(const char *detectorHostName); + + /* Returns detector hostname + /returns hostname + * caller needs to deallocate the returned char array. + * if uninitialized, it must return NULL + */ + //char *getDetectorHostname() const; + + /** + * Set Ethernet Interface or IP to listen to + */ + void setEthernetInterface(char* c); + + /** + * Set UDP Port Number + */ + void setUDPPortNo(int p); + + /* + * Returns number of frames to receive + * This is the number of frames to expect to receiver from the detector. + * The data receiver will change from running to idle when it got this number of frames + */ + + //int getNumberOfFrames() const; + + /** + * set frame number if a positive number + */ + //int32_t setNumberOfFrames(int32_t fnum); + + + /** + * Returns scan tag + */ + //int getScanTag() const; + + /** + * set scan tag if its is a positive number + */ + //int32_t setScanTag(int32_t stag); + + /** + * Returns the number of bits per pixel + */ + //int getDynamicRange() const; + + /** + * set dynamic range if its is a positive number + */ + int32_t setDynamicRange(int32_t dr); + + /** + * Set short frame + * @param i if shortframe i=1 + */ + int setShortFrame(int i); + + /** + * Set the variable to send every nth frame to gui + * or if 0,send frame only upon gui request + */ + int setNFrameToGui(int i); + + /** set acquisition period if a positive number + */ + int64_t setAcquisitionPeriod(int64_t index); + + /** get data compression, by saving only hits + */ + bool getDataCompression(); + + /** enabl data compression, by saving only hits + /returns if failed + */ + int enableDataCompression(bool enable); + + /** + * enable 10Gbe + @param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out + \returns enable for 10Gbe + */ + int enableTenGiga(int enable = -1); + + + +//other functions + + /** + * Returns the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + * @param fnum frame number for eiger as it is not in the packet + */ + void readFrame(char* c,char** raw, uint32_t &fnum); + + /** + * Closes all files + * @param ithr thread index + */ + void closeFile(int ithr = -1); + + /** + * Starts Receiver - starts to listen for packets + * @param message is the error message if there is an error + * Returns success + */ + int startReceiver(char message[]); + + /** + * Stops Receiver - stops listening for packets + * Returns success + */ + int stopReceiver(); + + /** set status to transmitting and + * when fifo is empty later, sets status to run_finished + */ + void startReadout(); + + /** + * shuts down the udp sockets + * \returns if success or fail + */ + int shutDownUDPSockets(); + +private: + + /* + void not_implemented(string method_name){ + std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; + }; + */ + /** + * Deletes all the filter objects for single photon data + */ + void deleteFilter(); + + /** + * Constructs the filter for single photon data + */ + void setupFilter(); + + /** + * set up fifo according to the new numjobsperthread + */ + void setupFifoStructure (); + + /** + * Copy frames to gui + * uses semaphore for nth frame mode + */ + void copyFrameToGui(char* startbuf[], uint32_t fnum=-1, char* buf=NULL); + + /** + * creates udp sockets + * \returns if success or fail + */ + int createUDPSockets(); + + /** + * create listening thread + * @param destroy is true to kill all threads and start again + */ + int createListeningThreads(bool destroy = false); + + /** + * create writer threads + * @param destroy is true to kill all threads and start again + */ + int createWriterThreads(bool destroy = false); + + /** + * set thread priorities + */ + void setThreadPriorities(); + + /** + * initializes variables and creates the first file + * also does the startAcquisitionCallBack + * \returns FAIL or OK + */ + int setupWriter(); + + /** + * Creates new tree and file for compression + * @param ithr thread number + * @param iframe frame number + *\returns OK for succces or FAIL for failure + */ + int createCompressionFile(int ithr, int iframe); + + /** + * Creates new file + *\returns OK for succces or FAIL for failure + */ + int createNewFile(); + + /** + * Static function - Thread started which listens to packets. + * Called by startReceiver() + * @param this_pointer pointer to this object + */ + static void* startListeningThread(void *this_pointer); + + /** + * Static function - Thread started which writes packets to file. + * Called by startReceiver() + * @param this_pointer pointer to this object + */ + static void* startWritingThread(void *this_pointer); + + /** + * Thread started which listens to packets. + * Called by startReceiver() + * + */ + int startListening(); + + /** + * Thread started which writes packets to file. + * Called by startReceiver() + * + */ + int startWriting(); + + /** + * Writing to file without compression + * @param buf is the address of buffer popped out of fifo + * @param numpackets is the number of packets + * @param framenum current frame number + */ + void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum); + + /** + * Its called for the first packet of a scan or acquistion + * Sets the startframeindices and the variables to know if acquisition started + * @param ithread listening thread number + */ + void startFrameIndices(int ithread); + + /** + * This is called when udp socket is shut down + * It pops ffff instead of packet number into fifo + * to inform writers about the end of listening session + * @param ithread listening thread number + * @param rc number of bytes received + * @param pc packet count + * @param t total packets listened to + */ + void stopListening(int ithread, int rc, int &pc, int &t); + + /** + * When acquisition is over, this is called + * @param ithread listening thread number + * @param wbuffer writer buffer + */ + void stopWriting(int ithread, char* wbuffer[]); + + + /** + * data compression for each fifo output + * @param ithread listening thread number + * @param wbuffer writer buffer + * @param npackets number of packets from the fifo + * @param data pointer to the next packet start + * @param xmax max pixels in x direction + * @param ymax max pixels in y direction + * @param nf nf + */ + void handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf); + + + /** structure of an eiger image header*/ + typedef struct + { + unsigned char header_before[20]; + unsigned char fnum[4]; + unsigned char header_after[24]; + } eiger_image_header; + + + /** structure of an eiger image header*/ + typedef struct + { + unsigned char num1[4]; + unsigned char num2[4]; + } eiger_packet_header; + + /** max number of listening threads */ + const static int MAX_NUM_LISTENING_THREADS = EIGER_MAX_PORTS; + + /** max number of writer threads */ + const static int MAX_NUM_WRITER_THREADS = 15; + + /** detector type */ + detectorType myDetectorType; + + /** detector hostname */ + char detHostname[MAX_STR_LENGTH]; + + /** status of receiver */ + runStatus status; + + /** UDP Socket between Receiver and Detector */ + genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; + + /** Server UDP Port*/ + int server_port[MAX_NUM_LISTENING_THREADS]; + + /** ethernet interface or IP to listen to */ + char *eth; + + /** max packets per file **/ + int maxPacketsPerFile; + + /** File write enable */ + int enableFileWrite; + + /** File over write enable */ + int overwrite; + + /** Complete File name */ + char savefilename[MAX_STR_LENGTH]; + + /** File Name without frame index, file index and extension*/ + char fileName[MAX_STR_LENGTH]; + + /** File Path */ + char filePath[MAX_STR_LENGTH]; + + /** File Index */ + int fileIndex; + + /** scan tag */ + int scanTag; + + /** if frame index required in file name */ + int frameIndexNeeded; + + /* Acquisition started */ + bool acqStarted; + + /* Measurement started */ + bool measurementStarted; + + /** Frame index at start of each real time acquisition (eg. for each scan) */ + uint32_t startFrameIndex; + + /** Actual current frame index of each time acquisition (eg. for each scan) */ + uint32_t frameIndex; + + /** Frames Caught for each real time acquisition (eg. for each scan) */ + int packetsCaught; + + /** Total packets caught for an entire acquisition (including all scans) */ + int totalPacketsCaught; + + /** Pckets currently in current file, starts new file when it reaches max */ + int packetsInFile; + + /** Frame index at start of an entire acquisition (including all scans) */ + uint32_t startAcquisitionIndex; + + /** Actual current frame index of an entire acquisition (including all scans) */ + uint32_t acquisitionIndex; + + /** number of packets per frame*/ + int packetsPerFrame; + + /** frame index mask */ + uint32_t frameIndexMask; + + /** packet index mask */ + uint32_t packetIndexMask; + + /** frame index offset */ + int frameIndexOffset; + + /** acquisition period */ + int64_t acquisitionPeriod; + + /** frame number */ + int32_t numberOfFrames; + + /** dynamic range */ + int dynamicRange; + + /** short frames */ + int shortFrame; + + /** current frame number */ + uint32_t currframenum; + + /** Previous Frame number from buffer */ + uint32_t prevframenum; + + /** size of one frame */ + int frameSize; + + /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ + int bufferSize; + + /** oen buffer size */ + int onePacketSize; + + /** latest data */ + char* latestData; + + /** gui data ready */ + int guiDataReady; + + /** points to the data to send to gui */ + char* guiData; + + /** points to the filename to send to gui */ + char* guiFileName; + + /** temporary number for eiger frame number as its not included in the packet */ + uint32_t guiFrameNumber; + + /** send every nth frame to gui or only upon gui request*/ + int nFrameToGui; + + /** fifo size */ + unsigned int fifosize; + + /** number of jobs per thread for data compression */ + int numJobsPerThread; + + /** datacompression - save only hits */ + bool dataCompression; + + /** memory allocated for the buffer */ + char *mem0[MAX_NUM_LISTENING_THREADS]; + + /** circular fifo to store addresses of data read */ + CircularFifo* fifo[MAX_NUM_LISTENING_THREADS]; + + /** circular fifo to store addresses of data already written and ready to be resued*/ + CircularFifo* fifoFree[MAX_NUM_LISTENING_THREADS]; + + /** Receiver buffer */ + char *buffer[MAX_NUM_LISTENING_THREADS]; + + /** number of writer threads */ + int numListeningThreads; + + /** number of writer threads */ + int numWriterThreads; + + /** to know if listening and writer threads created properly */ + int thread_started; + + /** current listening thread index*/ + int currentListeningThreadIndex; + + /** current writer thread index*/ + int currentWriterThreadIndex; + + /** thread listening to packets */ + pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; + + /** thread writing packets */ + pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; + + /** total frame count the listening thread has listened to */ + int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; + + /** mask showing which listening threads are running */ + volatile uint32_t listeningthreads_mask; + + /** mask showing which writer threads are running */ + volatile uint32_t writerthreads_mask; + + /** mask showing which threads have created files*/ + volatile uint32_t createfile_mask; + + /** OK if file created was successful */ + int ret_createfile; + + /** variable used to self terminate threads waiting for semaphores */ + int killAllListeningThreads; + + /** variable used to self terminate threads waiting for semaphores */ + int killAllWritingThreads; + + /** 10Gbe enable*/ + int tengigaEnable; + + + + +//semaphores + /** semaphore to synchronize writer and guireader threads */ + sem_t smp; + /** semaphore to synchronize listener threads */ + sem_t listensmp[MAX_NUM_LISTENING_THREADS]; + /** semaphore to synchronize writer threads */ + sem_t writersmp[MAX_NUM_WRITER_THREADS]; + + +//mutex + /** guiDataReady mutex */ + pthread_mutex_t dataReadyMutex; + + /** mutex for status */ + pthread_mutex_t status_mutex; + + /** mutex for progress variable currframenum */ + pthread_mutex_t progress_mutex; + + /** mutex for writing data to file */ + pthread_mutex_t write_mutex; + + /** File Descriptor */ + FILE *sfilefd; + + //filter + singlePhotonDetector *singlePhotonDet[MAX_NUM_WRITER_THREADS]; + slsReceiverData *receiverdata[MAX_NUM_WRITER_THREADS]; + moenchCommonMode *cmSub; + bool commonModeSubtractionEnable; + +#ifdef MYROOT1 + /** Tree where the hits are stored */ + TTree *myTree[MAX_NUM_WRITER_THREADS]; + + /** File where the tree is saved */ + TFile *myFile[MAX_NUM_WRITER_THREADS]; +#endif + + + + /** + callback arguments are + filepath + filename + fileindex + data size + + return value is + 0 callback takes care of open,close,write file + 1 callback writes file, we have to open, close it + 2 we open, close, write file, callback does not do anything + + */ + int (*startAcquisitionCallBack)(char*, char*,int, int, void*); + void *pStartAcquisition; + + /** + args to acquisition finished callback + total frames caught + + */ + void (*acquisitionFinishedCallBack)(int, void*); + void *pAcquisitionFinished; + + + /** + args to raw data ready callback are + framenum + datapointer + datasize in bytes + file descriptor + guidatapointer (NULL, no data required) + */ + void (*rawDataReadyCallBack)(int, char*, int, FILE*, char*, void*); + void *pRawDataReady; + + /** The action which decides what the user and default responsibilites to save data are + * 0 raw data ready callback takes care of open,close,write file + * 1 callback writes file, we have to open, close it + * 2 we open, close, write file, callback does not do anything */ + int cbAction; + + +public: + + + /** + callback arguments are + filepath + filename + fileindex + datasize + + return value is + 0 callback takes care of open,close,wrie file + 1 callback writes file, we have to open, close it + 2 we open, close, write file, callback does not do anything + */ + void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;}; + + /** + callback argument is + toatal frames caught + */ + void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;}; + + /** + args to raw data ready callback are + framenum + datapointer + datasize in bytes + file descriptor + guidatapointer (NULL, no data required) + */ + void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){rawDataReadyCallBack=func; pRawDataReady=arg;}; + + + //REST specific + bool isInitialized; + RestHelper * rest ; +}; + + +#endif + +//#endif /*REST*/ diff --git a/slsReceiverSoftware/slsReceiver/UDPStandardImplementation.cpp b/slsReceiverSoftware/slsReceiver/UDPStandardImplementation.cpp new file mode 100644 index 000000000..1314f2d5c --- /dev/null +++ b/slsReceiverSoftware/slsReceiver/UDPStandardImplementation.cpp @@ -0,0 +1,2330 @@ +/********************************************//** + * @file UDPStandardImplementation.cpp + * @short does all the functions for a receiver, set/get parameters, start/stop etc. + ***********************************************/ + + +#include "UDPStandardImplementation.h" + +#include "moench02ModuleData.h" +#include "gotthardModuleData.h" +#include "gotthardShortModuleData.h" + + +#include // SIGINT +#include // stat +#include // socket(), bind(), listen(), accept(), shut down +#include // sock_addr_in, htonl, INADDR_ANY +#include // exit() +#include //set precision +#include //munmap + + + +#include +#include + + + +using namespace std; + +void UDPStandardImplementation::initializeMembers(){ + myDetectorType = GENERIC; + maxPacketsPerFile = 0; + enableFileWrite = 1; + overwrite = 1; + fileIndex = 0; + scanTag = 0; + frameIndexNeeded = 0; + acqStarted = false; + measurementStarted = false; + startFrameIndex = 0; + frameIndex = 0; + packetsCaught = 0; + totalPacketsCaught = 0; + packetsInFile = 0; + startAcquisitionIndex = 0; + acquisitionIndex = 0; + packetsPerFrame = 0; + frameIndexMask = 0; + packetIndexMask = 0; + frameIndexOffset = 0; + acquisitionPeriod = SAMPLE_TIME_IN_NS; + numberOfFrames = 0; + dynamicRange = 16; + shortFrame = -1; + currframenum = 0; + prevframenum = 0; + frameSize = 0; + bufferSize = 0; + onePacketSize = 0; + guiDataReady = 0; + nFrameToGui = 0; + fifosize = 0; + numJobsPerThread = -1; + dataCompression = false; + numListeningThreads = 1; + numWriterThreads = 1; + thread_started = 0; + currentListeningThreadIndex = -1; + currentWriterThreadIndex = -1; + for(int i=0;i /proc/sys/net/core/rmem_max")) + cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl; + else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog")) + cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl; + /** permanent setting heiner + net.core.rmem_max = 104857600 # 100MiB + net.core.netdev_max_backlog = 250000 + sysctl -p + // from the manual + sysctl -w net.core.rmem_max=16777216 + sysctl -w net.core.netdev_max_backlog=250000 + */ +} + + + +UDPStandardImplementation::~UDPStandardImplementation(){ + createListeningThreads(true); + createWriterThreads(true); + deleteMembers(); +} + + + + +void UDPStandardImplementation::deleteMembers(){ + //kill threads + if(thread_started){ + createListeningThreads(true); + createWriterThreads(true); + } + + for(int i=0;i=0) + fileIndex = i; + return getFileIndex(); +} +*/ + +/* +int UDPStandardImplementation::setFrameIndexNeeded(int i){ + frameIndexNeeded = i; + return frameIndexNeeded; +} +*/ + +/* +int UDPStandardImplementation::getEnableFileWrite() const{ + return enableFileWrite; +} +*/ + +/* +int UDPStandardImplementation::setEnableFileWrite(int i){ + enableFileWrite=i; + return getEnableFileWrite(); +} +*/ + +/* +int UDPStandardImplementation::getEnableOverwrite() const{ + return overwrite; +} +*/ + +/* +int UDPStandardImplementation::setEnableOverwrite(int i){ + overwrite=i; + return getEnableOverwrite(); +} +*/ + + + + +/*other parameters*/ + +slsReceiverDefs::runStatus UDPStandardImplementation::getStatus() const{ + return status; +} + + +void UDPStandardImplementation::initialize(const char *detectorHostName){ + if(strlen(detectorHostName)) + strcpy(detHostname,detectorHostName); +} + + +char *UDPStandardImplementation::getDetectorHostname() const{ + return (char*)detHostname; +} + +void UDPStandardImplementation::setEthernetInterface(char* c){ + strcpy(eth,c); +} + + +void UDPStandardImplementation::setUDPPortNo(int p){ + for(int i=0;i= 0) + numberOfFrames = fnum; + + return getNumberOfFrames(); +} + +int UDPStandardImplementation::getScanTag() const{ + return scanTag; +} + + +int32_t UDPStandardImplementation::setScanTag(int32_t stag){ + if(stag >= 0) + scanTag = stag; + + return getScanTag(); +} + + +int UDPStandardImplementation::getDynamicRange() const{ + return dynamicRange; +} + +int32_t UDPStandardImplementation::setDynamicRange(int32_t dr){ + cout << "Setting Dynamic Range" << endl; + + int olddr = dynamicRange; + if(dr >= 0){ + dynamicRange = dr; + + if(myDetectorType == EIGER){ + + + if(!tengigaEnable) + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + else + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + frameSize = onePacketSize * packetsPerFrame; + bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + + + if(olddr != dr){ + + //del + if(thread_started){ + createListeningThreads(true); + createWriterThreads(true); + } + for(int i=0;i=0){ + nFrameToGui = i; + setupFifoStructure(); + } + return nFrameToGui; +} + + + +int64_t UDPStandardImplementation::setAcquisitionPeriod(int64_t index){ + + if(index >= 0){ + if(index != acquisitionPeriod){ + acquisitionPeriod = index; + setupFifoStructure(); + } + } + return acquisitionPeriod; +} + + +bool UDPStandardImplementation::getDataCompression(){return dataCompression;} + +int UDPStandardImplementation::enableDataCompression(bool enable){ + cout << "Data compression "; + if(enable) + cout << "enabled" << endl; + else + cout << "disabled" << endl; +#ifdef MYROOT1 + cout << " WITH ROOT" << endl; +#else + cout << " WITHOUT ROOT" << endl; +#endif + //delete filter for the current number of threads + deleteFilter(); + + dataCompression = enable; + pthread_mutex_lock(&status_mutex); + writerthreads_mask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + createWriterThreads(true); + + if(enable) + numWriterThreads = MAX_NUM_WRITER_THREADS; + else + numWriterThreads = 1; + + if(createWriterThreads() == FAIL){ + cout << "ERROR: Could not create writer threads" << endl; + return FAIL; + } + setThreadPriorities(); + + + if(enable) + setupFilter(); + + return OK; +} + + + + + + + + + + + + +/*other functions*/ + + +void UDPStandardImplementation::deleteFilter(){ + int i; + cmSub=NULL; + + for(i=0;i(receiverdata[i], csize, sigma, sign, cmSub); + +} + + + +//LEO: it is not clear to me.. +void UDPStandardImplementation::setupFifoStructure(){ + + int64_t i; + int oldn = numJobsPerThread; + + //if every nth frame mode + if(nFrameToGui) + numJobsPerThread = nFrameToGui; + + //random nth frame mode + else{ + if(!acquisitionPeriod) + i = SAMPLE_TIME_IN_NS; + else + i = SAMPLE_TIME_IN_NS/acquisitionPeriod; + if (i > MAX_JOBS_PER_THREAD) + numJobsPerThread = MAX_JOBS_PER_THREAD; + else if (i < 1) + numJobsPerThread = 1; + else + numJobsPerThread = i; + } + + //if same, return + if(oldn == numJobsPerThread) + return; + + if(myDetectorType == EIGER) + numJobsPerThread = 1; + + //otherwise memory too much if numjobsperthread is at max = 1000 + fifosize = GOTTHARD_FIFO_SIZE; + if(myDetectorType == MOENCH) + fifosize = MOENCH_FIFO_SIZE; + else if(myDetectorType == EIGER) + fifosize = EIGER_FIFO_SIZE; + + if(fifosize % numJobsPerThread) + fifosize = (fifosize/numJobsPerThread)+1; + else + fifosize = fifosize/numJobsPerThread; + + + cout << "Number of Frames per buffer:" << numJobsPerThread << endl; + cout << "Fifo Size:" << fifosize << endl; + + /* + //for testing + numJobsPerThread = 3; fifosize = 11; + */ + + for(int i=0;iisEmpty()) + fifoFree[i]->pop(buffer[i]); + delete fifoFree[i]; + } + if(fifo[i]) delete fifo[i]; + if(mem0[i]) free(mem0[i]); + fifoFree[i] = new CircularFifo(fifosize); + fifo[i] = new CircularFifo(fifosize); + + + //allocate memory + mem0[i]=(char*)malloc((bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*fifosize); + /** shud let the client know about this */ + if (mem0[i]==NULL){ + cout<<"++++++++++++++++++++++ COULD NOT ALLOCATE MEMORY FOR LISTENING !!!!!!!+++++++++++++++++++++" << endl; + exit(-1); + } + buffer[i]=mem0[i]; + //push the addresses into freed fifoFree and writingFifoFree + while (buffer[i]<(mem0[i]+(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS)*(fifosize-1))) { + fifoFree[i]->push(buffer[i]); + buffer[i]+=(bufferSize * numJobsPerThread + HEADER_SIZE_NUM_TOT_PACKETS); + } + } + cout << "Fifo structure(s) reconstructed" << endl; +} + + + + + + + +/** acquisition functions */ + +void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){ + //point to gui data + if (guiData == NULL) + guiData = latestData; + + //copy data and filename + strcpy(c,guiFileName); + fnum = guiFrameNumber; + + + //could not get gui data + if(!guiDataReady){ + *raw = NULL; + } + //data ready, set guidata to receive new data + else{ + *raw = guiData; + guiData = NULL; + + pthread_mutex_lock(&dataReadyMutex); + guiDataReady = 0; + pthread_mutex_unlock(&dataReadyMutex); + if((nFrameToGui) && (writerthreads_mask)){ + /*if(nFrameToGui){*/ + //release after getting data + sem_post(&smp); + } + } +} + + + + + +void UDPStandardImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ + + //random read when gui not ready + if((!nFrameToGui) && (!guiData)){ + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + pthread_mutex_unlock(&dataReadyMutex); + } + + //random read or nth frame read, gui needs data now + else{ + /* + //nth frame read, block current process if the guireader hasnt read it yet + if(nFrameToGui) + sem_wait(&smp); +*/ + pthread_mutex_lock(&dataReadyMutex); + guiDataReady=0; + //eiger + if(startbuf != NULL){ + int offset = 0; + int size = frameSize/EIGER_MAX_PORTS; + for(int j=0;jgetErrorStatus(); + if(iret){ +#ifdef VERBOSE + cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; +#endif + return FAIL; + } + } + + return OK; +} + + + + + + + +int UDPStandardImplementation::shutDownUDPSockets(){ + for(int i=0;iShutDownSocket(); + delete udpSocket[i]; + udpSocket[i] = NULL; + } + } + return OK; +} + + + + + +int UDPStandardImplementation::createListeningThreads(bool destroy){ + int i; + void* status; + + killAllListeningThreads = 0; + + pthread_mutex_lock(&status_mutex); + listeningthreads_mask = 0x0; + pthread_mutex_unlock(&(status_mutex)); + + if(!destroy){ + + //start listening threads + cout << "Creating Listening Threads(s)"; + + currentListeningThreadIndex = -1; + + for(i = 0; i < numListeningThreads; ++i){ + sem_init(&listensmp[i],1,0); + thread_started = 0; + currentListeningThreadIndex = i; + if(pthread_create(&listening_thread[i], NULL,startListeningThread, (void*) this)){ + cout << "Could not create listening thread with index " << i << endl; + return FAIL; + } + while(!thread_started); + cout << "."; + cout << flush; + } +#ifdef VERBOSE + cout << "Listening thread(s) created successfully." << endl; +#else + cout << endl; +#endif + }else{ + cout<<"Destroying Listening Thread(s)"<initEventTree(temp, &iframe); + //resets the pedestalSubtraction array and the commonModeSubtraction + singlePhotonDet[ithr]->newDataSet(); + if(myFile[ithr]==NULL){ + cout<<"file null"<IsOpen()){ + cout<<"file not open"< DO_NOTHING){ + //close + if(sfilefd){ + fclose(sfilefd); + sfilefd = NULL; + } + //open file + if(!overwrite){ + if (NULL == (sfilefd = fopen((const char *) (savefilename), "wx"))){ + cout << "Error: Could not create new file " << savefilename << endl; + return FAIL; + } + }else if (NULL == (sfilefd = fopen((const char *) (savefilename), "w"))){ + cout << "Error: Could not create file " << savefilename << endl; + return FAIL; + } + //setting buffer + setvbuf(sfilefd,NULL,_IOFBF,BUF_SIZE); + + //printing packet losses and file names + if(!packetsCaught) + cout << savefilename << endl; + else{ + cout << savefilename + << "\tpacket loss " + << setw(4)<GetCurrentFile(); + + if(myFile[ithr]->Write()) + //->Write(tall->GetName(),TObject::kOverwrite); + cout << "Thread " << ithr <<": wrote frames to file" << endl; + else + cout << "Thread " << ithr << ": could not write frames to file" << endl; + + }else + cout << "Thread " << ithr << ": could not write frames to file: No file or No Tree" << endl; + //close file + if(myTree[ithr] && myFile[ithr]) + myFile[ithr] = myTree[ithr]->GetCurrentFile(); + if(myFile[ithr] != NULL) + myFile[ithr]->Close(); + myFile[ithr] = NULL; + myTree[ithr] = NULL; + pthread_mutex_unlock(&write_mutex); + +#endif + } +} + + + + + +int UDPStandardImplementation::startReceiver(char message[]){ + int i; + + +// #ifdef VERBOSE + cout << "Starting Receiver" << endl; +//#endif + + + //reset listening thread variables + measurementStarted = false; + //should be set to zero as its added to get next start frame indices for scans for eiger + if(!acqStarted) currframenum = 0; + startFrameIndex = 0; + + for(int i = 0; i < numListeningThreads; ++i) + totalListeningFrameCount[i] = 0; + + //udp socket + if(createUDPSockets() == FAIL){ + strcpy(message,"Could not create UDP Socket(s).\n"); + cout << endl << message << endl; + return FAIL; + } + cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; + + + if(setupWriter() == FAIL){ + //stop udp socket + shutDownUDPSockets(); + + sprintf(message,"Could not create file %s.\n",savefilename); + return FAIL; + } + cout << "Successfully created file(s)" << endl; + + //done to give the gui some proper name instead of always the last file name + if(dataCompression) + sprintf(savefilename, "%s/%s_fxxx_%d_xx.root", filePath,fileName,fileIndex); + + //initialize semaphore + sem_init(&smp,1,0); + + //status + pthread_mutex_lock(&status_mutex); + status = RUNNING; + for(i=0;istartListening(); + + return this_pointer; +} + + + +void* UDPStandardImplementation::startWritingThread(void* this_pointer){ + ((UDPStandardImplementation*)this_pointer)->startWriting(); + return this_pointer; +} + + + + + + +int UDPStandardImplementation::startListening(){ + int ithread = currentListeningThreadIndex; +#ifdef VERYVERBOSE + cout << "In startListening() " << endl; +#endif + + thread_started = 1; + + int i,total; + int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize; + uint32_t lastframeheader;// for moench to check for all the packets in last frame + char* tempchar = NULL; + int imageheader = 0; + if(myDetectorType==EIGER) + imageheader = EIGER_IMAGE_HEADER_SIZE; + + + while(1){ + //variables that need to be checked/set before each acquisition + carryonBufferSize = 0; + //if more than 1 listening thread, listen one packet at a time, else need to interleaved frame later + maxBufferSize = bufferSize * numJobsPerThread; +#ifdef VERYDEBUG + cout << " maxBufferSize:" << maxBufferSize << ",carryonBufferSize:" << carryonBufferSize << endl; +#endif + + if(tempchar) {delete [] tempchar;tempchar = NULL;} + if(myDetectorType != EIGER) + tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size + + + while((1<pop(buffer[ithread]); +#ifdef VERYDEBUG + cout << ithread << " *** popped from fifo free" << (void*)buffer[ithread] << endl; +#endif + + + //receive + if(udpSocket[ithread] == NULL){ + rc = 0; + cout << ithread << "UDP Socket is NULL" << endl; + } + //normal listening + else if(!carryonBufferSize){ + + rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); + expected = maxBufferSize; + + } + //the remaining packets from previous buffer + else{ +#ifdef VERYDEBUG + cout << ithread << " ***carry on buffer" << carryonBufferSize << endl; + cout << ithread << " framennum in temochar:"<<((((uint32_t)(*((uint32_t*)tempchar))) + & (frameIndexMask)) >> frameIndexOffset)<ReceiveDataOnly((buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS + carryonBufferSize),maxBufferSize - carryonBufferSize); + expected = maxBufferSize - carryonBufferSize; + } + +#ifdef VERYDEBUG + cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; +#endif + + + + + //start indices for each start of scan/acquisition - eiger does it before + if((!measurementStarted) && (rc > 0) && (!ithread)) + startFrameIndices(ithread); + + //problem in receiving or end of acquisition + if((rc < expected)||(rc <= 0)){ + stopListening(ithread,rc,packetcount,total); + continue; + } + + + + //reset + packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; + carryonBufferSize = 0; + + + + //check if last packet valid and calculate packet count + switch(myDetectorType){ + + case MOENCH: + lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout <<"first packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (packetIndexMask)) << endl; + cout <<"first header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS))))) & (frameIndexMask)) >> frameIndexOffset) << endl; + cout << "last packet offset:" << lastpacketoffset << endl; + cout <<"last packet:"<< ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask)) << endl; + cout <<"last header:"<< (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + //moench last packet value is 0 + if( ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (packetIndexMask))){ + lastframeheader = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset; + carryonBufferSize += onePacketSize; + lastpacketoffset -= onePacketSize; + --packetcount; + while (lastframeheader == (((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))) & (frameIndexMask)) >> frameIndexOffset)){ + carryonBufferSize += onePacketSize; + lastpacketoffset -= onePacketSize; + --packetcount; + } + memcpy(tempchar, buffer[ithread]+(lastpacketoffset+onePacketSize), carryonBufferSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))) + & (frameIndexMask)) >> frameIndexOffset) << endl; + cout <<"tempchar packet:"<< ((((uint32_t)(*((uint32_t*)(tempchar))))) + & (packetIndexMask)) << endl; +#endif + } + break; + + case GOTTHARD: + if(shortFrame == -1){ + lastpacketoffset = (((numJobsPerThread * packetsPerFrame - 1) * onePacketSize) + HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYDEBUG + cout << "last packet offset:" << lastpacketoffset << endl; +#endif + + if((unsigned int)(packetsPerFrame -1) != ((((uint32_t)(*((uint32_t*)(buffer[ithread]+lastpacketoffset))))+1) & (packetIndexMask))){ + memcpy(tempchar,buffer[ithread]+lastpacketoffset, onePacketSize); +#ifdef VERYDEBUG + cout << "tempchar header:" << (((((uint32_t)(*((uint32_t*)(tempchar))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + carryonBufferSize = onePacketSize; + --packetcount; + } + } +#ifdef VERYDEBUG + cout << "header:" << (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset) << endl; +#endif + break; + default: + + break; + + } + + + // cout<<"*********** "<fnum)<push(buffer[ithread])); +#ifdef VERYDEBUG + if(!ithread) cout << ithread << " *** pushed into listening fifo" << endl; +#endif + } + + sem_wait(&listensmp[ithread]); + + //make sure its not exiting thread + if(killAllListeningThreads){ + cout << ithread << " good bye listening thread" << endl; + if(tempchar) {delete [] tempchar;tempchar = NULL;} + pthread_exit(NULL); + } + } + + return OK; +} + + + + + + + + + + + + + +int UDPStandardImplementation::startWriting(){ + int ithread = currentWriterThreadIndex; +#ifdef VERYVERBOSE + cout << ithread << "In startWriting()" <pop(wbuf[i]); + numpackets = (uint16_t)(*((uint16_t*)wbuf[i])); +#ifdef VERYDEBUG + cout << ithread << " numpackets:" << dec << numpackets << endl; +#endif + } + +#ifdef VERYDEBUG + cout << ithread << " numpackets:" << dec << numpackets << endl; + cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl; + cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl; +#endif + + + //last dummy packet + if(numpackets == 0xFFFF){ + stopWriting(ithread,wbuf); + continue; + } + + + + + //for progress + if(myDetectorType == EIGER){ + tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); + tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 + }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + pthread_mutex_lock(&progress_mutex); + if(tempframenum > currframenum) + currframenum = tempframenum; + pthread_mutex_unlock(&progress_mutex); + } +//#ifdef VERYDEBUG + if(myDetectorType == EIGER) + cout << endl < 0){ + for(i=0;ipush(wbuf[i])); +#ifdef VERYDEBUG + cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl; +#endif + } + + + } + else{ + //copy to gui + copyFrameToGui(NULL,-1,wbuf[0]+HEADER_SIZE_NUM_TOT_PACKETS); +#ifdef VERYVERBOSE + cout << ithread << " finished copying" << endl; +#endif + while(!fifoFree[0]->push(wbuf[0])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuf[0]<fnum); + //gotthard has +1 for frame number and not a short frame + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + startFrameIndex = (((((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1) + & (frameIndexMask)) >> frameIndexOffset); + else + startFrameIndex = ((((uint32_t)(*((uint32_t*)(buffer[ithread]+HEADER_SIZE_NUM_TOT_PACKETS)))) + & (frameIndexMask)) >> frameIndexOffset); + + + //start of acquisition + if(!acqStarted){ + startAcquisitionIndex=startFrameIndex; + currframenum = startAcquisitionIndex; + acqStarted = true; + cout << "startAcquisitionIndex:" << startAcquisitionIndex<push(buffer[ithread]); + exit(-1); + } + //push the last buffer into fifo + if(rc > 0){ + pc = (rc/onePacketSize); +#ifdef VERYDEBUG + cout << ithread << " *** last packetcount:" << pc << endl; +#endif + (*((uint16_t*)(buffer[ithread]))) = pc; + totalListeningFrameCount[ithread] += pc; + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " *** last lbuf1:" << (void*)buffer[ithread] << endl; +#endif + } + + + //push dummy buffer to all writer threads + for(i=0;ipop(buffer[ithread]); + (*((uint16_t*)(buffer[ithread]))) = 0xFFFF; +#ifdef VERYDEBUG + cout << ithread << " going to push in dummy buffer:" << (void*)buffer[ithread] << " with num packets:"<< (*((uint16_t*)(buffer[ithread]))) << endl; +#endif + while(!fifo[ithread]->push(buffer[ithread])); +#ifdef VERYDEBUG + cout << ithread << " pushed in dummy buffer:" << (void*)buffer[ithread] << endl; +#endif + } + + //reset mask and exit loop + pthread_mutex_lock(&status_mutex); + listeningthreads_mask^=(1< 1) + cout << "Waiting for listening to be done.. current mask:" << hex << listeningthreads_mask << endl; +#endif + while(listeningthreads_mask) + usleep(5000); +#ifdef VERYDEBUG + t = 0; + for(i=0;ipush(wbuffer[i])); +#ifdef VERYDEBUG + cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl; +#endif + } + + + + //all threads need to close file, reset mask and exit loop + closeFile(ithread); + pthread_mutex_lock(&status_mutex); + writerthreads_mask^=(1< 0){ + + //for progress and packet loss calculation(new files) + if(myDetectorType == EIGER); + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + + //lock + if(numWriterThreads > 1) + pthread_mutex_lock(&write_mutex); + + + //to create new file when max reached + packetsToSave = maxPacketsPerFile - packetsInFile; + if(packetsToSave > numpackets) + packetsToSave = numpackets; +/**next time offset is still plus header length*/ + fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd); + packetsInFile += packetsToSave; + packetsCaught += packetsToSave; + totalPacketsCaught += packetsToSave; + + + //new file + if(packetsInFile >= maxPacketsPerFile){ + //for packet loss + lastpacket = (((packetsToSave - 1) * onePacketSize) + offset); + if(myDetectorType == EIGER); + else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) + tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset); + else + tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset); + + if(numWriterThreads == 1) + currframenum = tempframenum; + else{ + if(tempframenum > currframenum) + currframenum = tempframenum; + } +#ifdef VERYDEBUG + cout << "tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl; +#endif + //create + createNewFile(); + } + + //unlock + if(numWriterThreads > 1) + pthread_mutex_unlock(&write_mutex); + + + offset += (packetsToSave * onePacketSize); + numpackets -= packetsToSave; + } + + } + else{ + if(numWriterThreads > 1) + pthread_mutex_lock(&write_mutex); + packetsInFile += numpackets; + packetsCaught += numpackets; + totalPacketsCaught += numpackets; + if(numWriterThreads > 1) + pthread_mutex_unlock(&write_mutex); + } +} + + + + + + + + + + + + + + +void UDPStandardImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ + +#if defined(MYROOT1) && defined(ALLFILE_DEBUG) + writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); +#endif + + eventType thisEvent = PEDESTAL; + int ndata; + char* buff = 0; + data = wbuffer[0]+ HEADER_SIZE_NUM_TOT_PACKETS; + int remainingsize = npackets * onePacketSize; + int np; + int once = 0; + double tot, tl, tr, bl, br; + int xmin = 1, ymin = 1, ix, iy; + + + while(buff = receiverdata[ithread]->findNextFrame(data,ndata,remainingsize)){ + np = ndata/onePacketSize; + + //cout<<"buff framnum:"<> frameIndexOffset)<newFrame(); + + //only for moench + if(commonModeSubtractionEnable){ + for(ix = xmin - 1; ix < xmax+1; ix++){ + for(iy = ymin - 1; iy < ymax+1; iy++){ + thisEvent = singlePhotonDet[ithread]->getEventType(buff, ix, iy, 0); + } + } + } + + + for(ix = xmin - 1; ix < xmax+1; ix++) + for(iy = ymin - 1; iy < ymax+1; iy++){ + thisEvent=singlePhotonDet[ithread]->getEventType(buff, ix, iy, commonModeSubtractionEnable); + if (nf>1000) { + tot=0; + tl=0; + tr=0; + bl=0; + br=0; + if (thisEvent==PHOTON_MAX) { + receiverdata[ithread]->getFrameNumber(buff); + //iFrame=receiverdata[ithread]->getFrameNumber(buff); +#ifdef MYROOT1 + myTree[ithread]->Fill(); + //cout << "Fill in event: frmNr: " << iFrame << " ix " << ix << " iy " << iy << " type " << thisEvent << endl; +#else + pthread_mutex_lock(&write_mutex); + if((enableFileWrite) && (sfilefd)) + singlePhotonDet[ithread]->writeCluster(sfilefd); + pthread_mutex_unlock(&write_mutex); +#endif + } + } + } + + nf++; +#ifndef ALLFILE + pthread_mutex_lock(&progress_mutex); + packetsInFile += packetsPerFrame; + packetsCaught += packetsPerFrame; + totalPacketsCaught += packetsPerFrame; + if(packetsInFile >= maxPacketsPerFile) + createNewFile(); + pthread_mutex_unlock(&progress_mutex); + +#endif + if(!once){ + copyFrameToGui(NULL,-1,buff); + once = 1; + } + } + + remainingsize -= ((buff + ndata) - data); + data = buff + ndata; + if(data > (wbuffer[0] + HEADER_SIZE_NUM_TOT_PACKETS + npackets * onePacketSize) ) + cout <<" **************ERROR SHOULD NOT COME HERE, Error 142536!"<push(wbuffer[0])); +#ifdef VERYVERBOSE + cout<<"buf freed:"<<(void*)wbuffer[0]<= 0){ + + tengigaEnable = enable; + + if(myDetectorType == EIGER){ + + if(!tengigaEnable){ + packetsPerFrame = EIGER_ONE_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_ONE_GIGA_ONE_PACKET_SIZE; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + }else{ + packetsPerFrame = EIGER_TEN_GIGA_CONSTANT * dynamicRange * EIGER_MAX_PORTS; + onePacketSize = EIGER_TEN_GIGA_ONE_PACKET_SIZE; + maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame*4; + } + frameSize = onePacketSize * packetsPerFrame; + bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//everything one port gets (img header plus packets) + //maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame; + + + cout<<"packetsPerFrame:"< +#include +#endif + + +#include +#include +#include +#include + + +/** + * @short does all the functions for a receiver, set/get parameters, start/stop etc. + */ + + +class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBaseImplementation { + public: + /** + * Constructor + */ +UDPStandardImplementation(); + + /** + * Destructor + */ + virtual ~UDPStandardImplementation(); + + + + /** + * delete and free member parameters + */ + void deleteMembers(); + + /** + * initialize member parameters + */ + void initializeMembers(); + + /** + * Set receiver type + * @param det detector type + * Returns success or FAIL + */ + int setDetectorType(detectorType det); + + + //Frame indices and numbers caught + /** + * Returns current Frame Index Caught for an entire acquisition (including all scans) + */ + //uint32_t getAcquisitionIndex(); + + /** + * Returns if acquisition started + */ + //bool getAcquistionStarted(); + + /** + * Returns Frames Caught for each real time acquisition (eg. for each scan) + */ + //int getFramesCaught(); + + /** + * Returns Total Frames Caught for an entire acquisition (including all scans) + */ + //int getTotalFramesCaught(); + + /** + * Returns the frame index at start of each real time acquisition (eg. for each scan) + */ + //uint32_t getStartFrameIndex(); + + /** + * Returns current Frame Index for each real time acquisition (eg. for each scan) + */ + //uint32_t getFrameIndex(); + + /** + * Returns if measurement started + */ + //bool getMeasurementStarted(); + + /** + * Resets the Total Frames Caught + * This is how the receiver differentiates between entire acquisitions + * Returns 0 + */ + //void resetTotalFramesCaught(); + + + //file parameters + /** + * Returns File Path + */ + //char* getFilePath() const; + + /** + * Set File Path + * @param c file path + */ + //char* setFilePath(const char c[]); + + /** + * Returns File Name + */ + //char* getFileName() const; + + /** + * Set File Name (without frame index, file index and extension) + * @param c file name + */ + //char* setFileName(const char c[]); + + /** + * Returns File Index + */ + //int getFileIndex(); + + /** + * Set File Index + * @param i file index + */ + //int setFileIndex(int i); + + /** + * Set Frame Index Needed + * @param i frame index needed + */ + //int setFrameIndexNeeded(int i); + + /** + * Set enable file write + * @param i file write enable + * Returns file write enable + */ + //int setEnableFileWrite(int i); + + /** + * Enable/disable overwrite + * @param i enable + * Returns enable over write + */ + //int setEnableOverwrite(int i); + + /** + * Returns file write enable + * 1: YES 0: NO + */ + //int getEnableFileWrite() const; + + /** + * Returns file over write enable + * 1: YES 0: NO + */ + //int getEnableOverwrite() const; + +//other parameters + + /** + * abort acquisition with minimum damage: close open files, cleanup. + * does nothing if state already is 'idle' + */ + void abort() {}; + + /** + * Returns status of receiver: idle, running or error + */ + runStatus getStatus() const; + + /** + * Set detector hostname + * @param c hostname + */ + void initialize(const char *detectorHostName); + + /* Returns detector hostname + /returns hostname + * caller needs to deallocate the returned char array. + * if uninitialized, it must return NULL + */ + char *getDetectorHostname() const; + + /** + * Set Ethernet Interface or IP to listen to + */ + void setEthernetInterface(char* c); + + /** + * Set UDP Port Number + */ + void setUDPPortNo(int p); + + /* + * Returns number of frames to receive + * This is the number of frames to expect to receiver from the detector. + * The data receiver will change from running to idle when it got this number of frames + */ + int getNumberOfFrames() const; + + /** + * set frame number if a positive number + */ + int32_t setNumberOfFrames(int32_t fnum); + + /** + * Returns scan tag + */ + int getScanTag() const; + + /** + * set scan tag if its is a positive number + */ + int32_t setScanTag(int32_t stag); + + /** + * Returns the number of bits per pixel + */ + int getDynamicRange() const; + + /** + * set dynamic range if its is a positive number + */ + int32_t setDynamicRange(int32_t dr); + + /** + * Set short frame + * @param i if shortframe i=1 + */ + int setShortFrame(int i); + + /** + * Set the variable to send every nth frame to gui + * or if 0,send frame only upon gui request + */ + int setNFrameToGui(int i); + + /** set acquisition period if a positive number + */ + int64_t setAcquisitionPeriod(int64_t index); + + /** get data compression, by saving only hits + */ + bool getDataCompression(); + + /** enabl data compression, by saving only hits + /returns if failed + */ + int enableDataCompression(bool enable); + + /** + * enable 10Gbe + @param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out + \returns enable for 10Gbe + */ + int enableTenGiga(int enable = -1); + + + +//other functions + + /** + * Returns the buffer-current frame read by receiver + * @param c pointer to current file name + * @param raw address of pointer, pointing to current frame to send to gui + * @param fnum frame number for eiger as it is not in the packet + */ + void readFrame(char* c,char** raw, uint32_t &fnum); + + /** + * Closes all files + * @param ithr thread index + */ + void closeFile(int ithr = -1); + + /** + * Starts Receiver - starts to listen for packets + * @param message is the error message if there is an error + * Returns success + */ + int startReceiver(char message[]); + + /** + * Stops Receiver - stops listening for packets + * Returns success + */ + int stopReceiver(); + + /** set status to transmitting and + * when fifo is empty later, sets status to run_finished + */ + void startReadout(); + + /** + * shuts down the udp sockets + * \returns if success or fail + */ + int shutDownUDPSockets(); + +private: + + /* + void not_implemented(string method_name){ + std::cout << "[WARNING] Method " << method_name << " not implemented!" << std::endl; + }; + */ + /** + * Deletes all the filter objects for single photon data + */ + void deleteFilter(); + + /** + * Constructs the filter for single photon data + */ + void setupFilter(); + + /** + * set up fifo according to the new numjobsperthread + */ + void setupFifoStructure (); + + /** + * Copy frames to gui + * uses semaphore for nth frame mode + */ + void copyFrameToGui(char* startbuf[], uint32_t fnum=-1, char* buf=NULL); + + /** + * creates udp sockets + * \returns if success or fail + */ + int createUDPSockets(); + + /** + * create listening thread + * @param destroy is true to kill all threads and start again + */ + int createListeningThreads(bool destroy = false); + + /** + * create writer threads + * @param destroy is true to kill all threads and start again + */ + int createWriterThreads(bool destroy = false); + + /** + * set thread priorities + */ + void setThreadPriorities(); + + /** + * initializes variables and creates the first file + * also does the startAcquisitionCallBack + * \returns FAIL or OK + */ + int setupWriter(); + + /** + * Creates new tree and file for compression + * @param ithr thread number + * @param iframe frame number + *\returns OK for succces or FAIL for failure + */ + int createCompressionFile(int ithr, int iframe); + + /** + * Creates new file + *\returns OK for succces or FAIL for failure + */ + int createNewFile(); + + /** + * Static function - Thread started which listens to packets. + * Called by startReceiver() + * @param this_pointer pointer to this object + */ + static void* startListeningThread(void *this_pointer); + + /** + * Static function - Thread started which writes packets to file. + * Called by startReceiver() + * @param this_pointer pointer to this object + */ + static void* startWritingThread(void *this_pointer); + + /** + * Thread started which listens to packets. + * Called by startReceiver() + * + */ + int startListening(); + + /** + * Thread started which writes packets to file. + * Called by startReceiver() + * + */ + int startWriting(); + + /** + * Writing to file without compression + * @param buf is the address of buffer popped out of fifo + * @param numpackets is the number of packets + * @param framenum current frame number + */ + void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum); + + /** + * Its called for the first packet of a scan or acquistion + * Sets the startframeindices and the variables to know if acquisition started + * @param ithread listening thread number + */ + void startFrameIndices(int ithread); + + /** + * This is called when udp socket is shut down + * It pops ffff instead of packet number into fifo + * to inform writers about the end of listening session + * @param ithread listening thread number + * @param rc number of bytes received + * @param pc packet count + * @param t total packets listened to + */ + void stopListening(int ithread, int rc, int &pc, int &t); + + /** + * When acquisition is over, this is called + * @param ithread listening thread number + * @param wbuffer writer buffer + */ + void stopWriting(int ithread, char* wbuffer[]); + + + /** + * data compression for each fifo output + * @param ithread listening thread number + * @param wbuffer writer buffer + * @param npackets number of packets from the fifo + * @param data pointer to the next packet start + * @param xmax max pixels in x direction + * @param ymax max pixels in y direction + * @param nf nf + */ + void handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf); + + + /** structure of an eiger image header*/ + typedef struct + { + unsigned char header_before[20]; + unsigned char fnum[4]; + unsigned char header_after[24]; + } eiger_image_header; + + + /** structure of an eiger image header*/ + typedef struct + { + unsigned char num1[4]; + unsigned char num2[4]; + } eiger_packet_header; + + /** max number of listening threads */ + const static int MAX_NUM_LISTENING_THREADS = EIGER_MAX_PORTS; + + /** max number of writer threads */ + const static int MAX_NUM_WRITER_THREADS = 15; + + /** detector type */ + detectorType myDetectorType; + + /** detector hostname */ + char detHostname[MAX_STR_LENGTH]; + + /** status of receiver */ + runStatus status; + + /** UDP Socket between Receiver and Detector */ + genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; + + /** Server UDP Port*/ + int server_port[MAX_NUM_LISTENING_THREADS]; + + /** ethernet interface or IP to listen to */ + char *eth; + + /** max packets per file **/ + int maxPacketsPerFile; + + /** File write enable */ + int enableFileWrite; + + /** File over write enable */ + int overwrite; + + /** Complete File name */ + char savefilename[MAX_STR_LENGTH]; + + /** File Name without frame index, file index and extension*/ + char fileName[MAX_STR_LENGTH]; + + /** File Path */ + char filePath[MAX_STR_LENGTH]; + + /** File Index */ + int fileIndex; + + /** scan tag */ + int scanTag; + + /** if frame index required in file name */ + int frameIndexNeeded; + + /* Acquisition started */ + bool acqStarted; + + /* Measurement started */ + bool measurementStarted; + + /** Frame index at start of each real time acquisition (eg. for each scan) */ + uint32_t startFrameIndex; + + /** Actual current frame index of each time acquisition (eg. for each scan) */ + uint32_t frameIndex; + + /** Frames Caught for each real time acquisition (eg. for each scan) */ + int packetsCaught; + + /** Total packets caught for an entire acquisition (including all scans) */ + int totalPacketsCaught; + + /** Pckets currently in current file, starts new file when it reaches max */ + int packetsInFile; + + /** Frame index at start of an entire acquisition (including all scans) */ + uint32_t startAcquisitionIndex; + + /** Actual current frame index of an entire acquisition (including all scans) */ + uint32_t acquisitionIndex; + + /** number of packets per frame*/ + int packetsPerFrame; + + /** frame index mask */ + uint32_t frameIndexMask; + + /** packet index mask */ + uint32_t packetIndexMask; + + /** frame index offset */ + int frameIndexOffset; + + /** acquisition period */ + int64_t acquisitionPeriod; + + /** frame number */ + int32_t numberOfFrames; + + /** dynamic range */ + int dynamicRange; + + /** short frames */ + int shortFrame; + + /** current frame number */ + uint32_t currframenum; + + /** Previous Frame number from buffer */ + uint32_t prevframenum; + + /** size of one frame */ + int frameSize; + + /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ + int bufferSize; + + /** oen buffer size */ + int onePacketSize; + + /** latest data */ + char* latestData; + + /** gui data ready */ + int guiDataReady; + + /** points to the data to send to gui */ + char* guiData; + + /** points to the filename to send to gui */ + char* guiFileName; + + /** temporary number for eiger frame number as its not included in the packet */ + uint32_t guiFrameNumber; + + /** send every nth frame to gui or only upon gui request*/ + int nFrameToGui; + + /** fifo size */ + unsigned int fifosize; + + /** number of jobs per thread for data compression */ + int numJobsPerThread; + + /** datacompression - save only hits */ + bool dataCompression; + + /** memory allocated for the buffer */ + char *mem0[MAX_NUM_LISTENING_THREADS]; + + /** circular fifo to store addresses of data read */ + CircularFifo* fifo[MAX_NUM_LISTENING_THREADS]; + + /** circular fifo to store addresses of data already written and ready to be resued*/ + CircularFifo* fifoFree[MAX_NUM_LISTENING_THREADS]; + + /** Receiver buffer */ + char *buffer[MAX_NUM_LISTENING_THREADS]; + + /** number of writer threads */ + int numListeningThreads; + + /** number of writer threads */ + int numWriterThreads; + + /** to know if listening and writer threads created properly */ + int thread_started; + + /** current listening thread index*/ + int currentListeningThreadIndex; + + /** current writer thread index*/ + int currentWriterThreadIndex; + + /** thread listening to packets */ + pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; + + /** thread writing packets */ + pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; + + /** total frame count the listening thread has listened to */ + int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; + + /** mask showing which listening threads are running */ + volatile uint32_t listeningthreads_mask; + + /** mask showing which writer threads are running */ + volatile uint32_t writerthreads_mask; + + /** mask showing which threads have created files*/ + volatile uint32_t createfile_mask; + + /** OK if file created was successful */ + int ret_createfile; + + /** variable used to self terminate threads waiting for semaphores */ + int killAllListeningThreads; + + /** variable used to self terminate threads waiting for semaphores */ + int killAllWritingThreads; + + /** 10Gbe enable*/ + int tengigaEnable; + + + + +//semaphores + /** semaphore to synchronize writer and guireader threads */ + sem_t smp; + /** semaphore to synchronize listener threads */ + sem_t listensmp[MAX_NUM_LISTENING_THREADS]; + /** semaphore to synchronize writer threads */ + sem_t writersmp[MAX_NUM_WRITER_THREADS]; + + +//mutex + /** guiDataReady mutex */ + pthread_mutex_t dataReadyMutex; + + /** mutex for status */ + pthread_mutex_t status_mutex; + + /** mutex for progress variable currframenum */ + pthread_mutex_t progress_mutex; + + /** mutex for writing data to file */ + pthread_mutex_t write_mutex; + + /** File Descriptor */ + FILE *sfilefd; + + //filter + singlePhotonDetector *singlePhotonDet[MAX_NUM_WRITER_THREADS]; + slsReceiverData *receiverdata[MAX_NUM_WRITER_THREADS]; + moenchCommonMode *cmSub; + bool commonModeSubtractionEnable; + +#ifdef MYROOT1 + /** Tree where the hits are stored */ + TTree *myTree[MAX_NUM_WRITER_THREADS]; + + /** File where the tree is saved */ + TFile *myFile[MAX_NUM_WRITER_THREADS]; +#endif + + + + /** + callback arguments are + filepath + filename + fileindex + data size + + return value is + 0 callback takes care of open,close,write file + 1 callback writes file, we have to open, close it + 2 we open, close, write file, callback does not do anything + + */ + int (*startAcquisitionCallBack)(char*, char*,int, int, void*); + void *pStartAcquisition; + + /** + args to acquisition finished callback + total frames caught + + */ + void (*acquisitionFinishedCallBack)(int, void*); + void *pAcquisitionFinished; + + + /** + args to raw data ready callback are + framenum + datapointer + datasize in bytes + file descriptor + guidatapointer (NULL, no data required) + */ + void (*rawDataReadyCallBack)(int, char*, int, FILE*, char*, void*); + void *pRawDataReady; + + /** The action which decides what the user and default responsibilites to save data are + * 0 raw data ready callback takes care of open,close,write file + * 1 callback writes file, we have to open, close it + * 2 we open, close, write file, callback does not do anything */ + int cbAction; + + +public: + + + /** + callback arguments are + filepath + filename + fileindex + datasize + + return value is + 0 callback takes care of open,close,wrie file + 1 callback writes file, we have to open, close it + 2 we open, close, write file, callback does not do anything + */ + void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;}; + + /** + callback argument is + toatal frames caught + */ + void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;}; + + /** + args to raw data ready callback are + framenum + datapointer + datasize in bytes + file descriptor + guidatapointer (NULL, no data required) + */ + void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){rawDataReadyCallBack=func; pRawDataReady=arg;}; +}; + + +#endif + +//#endif