Merge branch 'eiger_receiver' of gitorious.psi.ch:sls_det_software/sls_receiver_software into eiger_receiver

This commit is contained in:
2014-11-17 15:52:43 +01:00
20 changed files with 604 additions and 1200 deletions

View File

@ -1,4 +1,3 @@
include ../Makefile.include include ../Makefile.include
DESTDIR ?= ../bin DESTDIR ?= ../bin

View File

@ -1,9 +1,9 @@
Path: slsDetectorsPackage/slsReceiverSoftware Path: slsDetectorsPackage/slsReceiverSoftware
URL: origin git@gitorious.psi.ch:sls_det_software/sls_receiver_software.git URL: origin git@gitorious.psi.ch:sls_det_software/sls_receiver_software.git
Repository Root: origin git@gitorious.psi.ch:sls_det_software/sls_receiver_software.git Repository Root: origin git@gitorious.psi.ch:sls_det_software/sls_receiver_software.git
Repsitory UUID: 1c259aeba8b068b9f6e550d63a9a3a14bd7d3ab7 Repsitory UUID: e019a6ce7d96d4ac9cb5762b7137245aedb4d5b8
Revision: 6 Revision: 22
Branch: master Branch: master
Last Changed Author: Maliakal_Dhanya Last Changed Author: Anna_Bergamaschi
Last Changed Rev: 6 Last Changed Rev: 22
Last Changed Date: 2014-06-03 12:06:57 +0200 Last Changed Date: 2014-10-15 09:22:40 +0200

View File

@ -25,6 +25,7 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <exception> #include <exception>
#include <unistd.h>
@ -35,19 +36,21 @@ using namespace std;
class RestHelper { class RestHelper {
public: public:
RestHelper(int timeout=10, int n_tries=10){ RestHelper(int timeout=10, int n_tries=1){
/** /**
* *
* *
* @param timeout default=10 * @param timeout default=10
* @param n_tries default=3 * @param n_tries default=1
*/ */
http_timeout = timeout; http_timeout = timeout;
n_connection_tries = n_tries; n_connection_tries = n_tries;
} }
~RestHelper(){}; ~RestHelper(){
delete session;
};
void set_connection_params(int timeout, int n_tries){ void set_connection_params(int timeout, int n_tries){
@ -74,14 +77,10 @@ class RestHelper {
*/ */
//Check for http:// string //Check for http:// string
FILE_LOG(logDEBUG) << __func__ << " starting";
string proto_str = "http://"; string proto_str = "http://";
if( size_t found = hostname.find(proto_str) != string::npos ){ if( size_t found = hostname.find(proto_str) != string::npos ){
cout << hostname << endl;
char c1[hostname.size()-found-1]; char c1[hostname.size()-found-1];
cout << c1 << endl;
size_t length1 = hostname.copy(c1, hostname.size()-found-1, proto_str.size()); size_t length1 = hostname.copy(c1, hostname.size()-found-1, proto_str.size());
c1[length1]='\0'; c1[length1]='\0';
hostname = c1; hostname = c1;
@ -168,7 +167,7 @@ class RestHelper {
string answer; string answer;
int code = send_request(session, req, &answer); int code = send_request(session, req, &answer);
if(code == 0 ) { if(code == 0 ) {
FILE_LOG(logDEBUG) << "ANSWER " << answer; FILE_LOG(logDEBUG) << __AT__ << " REQUEST: " << " ANSWER: " << answer;
json_value->loadFromString(answer); json_value->loadFromString(answer);
} }
delete uri; delete uri;
@ -192,7 +191,6 @@ class RestHelper {
if (path.empty()) path = "/"; if (path.empty()) path = "/";
HTTPRequest req(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1 ); HTTPRequest req(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1 );
req.setContentType("application/json\r\n"); req.setContentType("application/json\r\n");
cout << "REQUEST BODY " << request_body << endl;
req.setContentLength( request_body.length() ); req.setContentLength( request_body.length() );
int code = send_request(session, req, answer, request_body); int code = send_request(session, req, answer, request_body);
@ -266,9 +264,8 @@ class RestHelper {
if (request_body == "") if (request_body == "")
session->sendRequest( (req) ); session->sendRequest( (req) );
else{ else{
cout << request_body << endl; ostream &os = session->sendRequest( req ) ;
ostream &os = session->sendRequest( req ) ; os << request_body;
os << request_body;
} }
HTTPResponse res; HTTPResponse res;
@ -276,7 +273,7 @@ class RestHelper {
StreamCopier::copyToString(is, *answer); StreamCopier::copyToString(is, *answer);
code = res.getStatus(); code = res.getStatus();
if (code != 200){ if (code != 200){
cout << "HTTP ERROR " << res.getStatus() << ": " << res.getReason() << endl; FILE_LOG(logERROR) << "HTTP ERROR " << res.getStatus() << ": " << res.getReason() ;
code = -1; code = -1;
} }
else else
@ -290,7 +287,8 @@ class RestHelper {
n+=1; n+=1;
} }
return code; throw std::string("Cannot connect to the REST server! Please check...");
//return code;
} }
}; };

View File

@ -213,6 +213,7 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
* Set UDP Port Number * Set UDP Port Number
*/ */
void setUDPPortNo(int p); void setUDPPortNo(int p);
void setUDPPortNo2(int p);
/* /*
* Returns number of frames to receive * Returns number of frames to receive
@ -287,9 +288,9 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter
* @param c pointer to current file name * @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui * @param raw address of pointer, pointing to current frame to send to gui
* @param fnum frame number for eiger as it is not in the packet * @param fnum frame number for eiger as it is not in the packet
* @param fstartind is the start index of the acquisition
*/ */
void readFrame(char* c,char** raw, uint32_t &fnum); void readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind);
/** /**
* Closes all files * Closes all files
* @param ithr thread index * @param ithr thread index
@ -777,6 +778,8 @@ protected:
* 2 we open, close, write file, callback does not do anything */ * 2 we open, close, write file, callback does not do anything */
int cbAction; int cbAction;
/** true if bottom half module for eiger */
bool bottom;
public: public:

View File

@ -282,6 +282,11 @@ class UDPInterface {
*/ */
virtual void setUDPPortNo(int p) = 0; virtual void setUDPPortNo(int p) = 0;
/**
* Set UDP Port Number
*/
virtual void setUDPPortNo2(int p) = 0;
/** /**
* Set Ethernet Interface or IP to listen to * Set Ethernet Interface or IP to listen to
*/ */
@ -323,8 +328,9 @@ class UDPInterface {
* @param c pointer to current file name * @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui * @param raw address of pointer, pointing to current frame to send to gui
* @param fnum frame number for eiger as it is not in the packet * @param fnum frame number for eiger as it is not in the packet
* @param fstartind is the start index of the acquisition
*/ */
virtual void readFrame(char* c,char** raw, uint32_t &fnum) = 0; virtual void readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind ) = 0;
/** set status to transmitting and /** set status to transmitting and
* when fifo is empty later, sets status to run_finished * when fifo is empty later, sets status to run_finished

View File

@ -50,6 +50,8 @@ class UDPRESTImplementation : protected virtual slsReceiverDefs, public UDPBaseI
protected: protected:
void initialize_REST(); void initialize_REST();
int get_rest_state(RestHelper * rest, string *rest_state);
public: public:
void configure(map<string, string> config_map); void configure(map<string, string> config_map);
@ -221,6 +223,7 @@ class UDPRESTImplementation : protected virtual slsReceiverDefs, public UDPBaseI
* Set UDP Port Number * Set UDP Port Number
*/ */
void setUDPPortNo(int p); void setUDPPortNo(int p);
void setUDPPortNo2(int p);
/* /*
* Returns number of frames to receive * Returns number of frames to receive
@ -298,7 +301,7 @@ class UDPRESTImplementation : protected virtual slsReceiverDefs, public UDPBaseI
* @param raw address of pointer, pointing to current frame to send to gui * @param raw address of pointer, pointing to current frame to send to gui
* @param fnum frame number for eiger as it is not in the packet * @param fnum frame number for eiger as it is not in the packet
*/ */
void readFrame(char* c,char** raw, uint32_t &fnum); void readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind);
/** /**
* Closes all files * Closes all files
@ -500,233 +503,233 @@ private:
const static int MAX_NUM_WRITER_THREADS = 15; const static int MAX_NUM_WRITER_THREADS = 15;
/** detector type */ /** detector type */
detectorType myDetectorType; //detectorType myDetectorType;
/** detector hostname */ /** detector hostname */
char detHostname[MAX_STR_LENGTH]; //char detHostname[MAX_STR_LENGTH];
/** status of receiver */ /** status of receiver */
runStatus status; //runStatus status;
/** UDP Socket between Receiver and Detector */ /** UDP Socket between Receiver and Detector */
genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS]; //genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS];
/** Server UDP Port*/ /** Server UDP Port*/
int server_port[MAX_NUM_LISTENING_THREADS]; //int server_port[MAX_NUM_LISTENING_THREADS];
/** ethernet interface or IP to listen to */ /** ethernet interface or IP to listen to */
char *eth; //char *eth;
/** max packets per file **/ /** max packets per file **/
int maxPacketsPerFile; //int maxPacketsPerFile;
/** File write enable */ /** File write enable */
int enableFileWrite; //int enableFileWrite;
/** File over write enable */ /** File over write enable */
int overwrite; //int overwrite;
/** Complete File name */ /** Complete File name */
char savefilename[MAX_STR_LENGTH]; //char savefilename[MAX_STR_LENGTH];
/** File Name without frame index, file index and extension*/ /** File Name without frame index, file index and extension*/
char fileName[MAX_STR_LENGTH]; //char fileName[MAX_STR_LENGTH];
/** File Path */ /** File Path */
char filePath[MAX_STR_LENGTH]; //char filePath[MAX_STR_LENGTH];
/** File Index */ /** File Index */
int fileIndex; //int fileIndex;
/** scan tag */ /** scan tag */
int scanTag; //int scanTag;
/** if frame index required in file name */ /** if frame index required in file name */
int frameIndexNeeded; //int frameIndexNeeded;
/* Acquisition started */ /* Acquisition started */
bool acqStarted; //bool acqStarted;
/* Measurement started */ /* Measurement started */
bool measurementStarted; //bool measurementStarted;
/** Frame index at start of each real time acquisition (eg. for each scan) */ /** Frame index at start of each real time acquisition (eg. for each scan) */
uint32_t startFrameIndex; //uint32_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */ /** Actual current frame index of each time acquisition (eg. for each scan) */
uint32_t frameIndex; //uint32_t frameIndex;
/** Frames Caught for each real time acquisition (eg. for each scan) */ /** Frames Caught for each real time acquisition (eg. for each scan) */
int packetsCaught; //int packetsCaught;
/** Total packets caught for an entire acquisition (including all scans) */ /** Total packets caught for an entire acquisition (including all scans) */
int totalPacketsCaught; //int totalPacketsCaught;
/** Pckets currently in current file, starts new file when it reaches max */ /** Pckets currently in current file, starts new file when it reaches max */
int packetsInFile; //int packetsInFile;
/** Frame index at start of an entire acquisition (including all scans) */ /** Frame index at start of an entire acquisition (including all scans) */
uint32_t startAcquisitionIndex; //uint32_t startAcquisitionIndex;
/** Actual current frame index of an entire acquisition (including all scans) */ /** Actual current frame index of an entire acquisition (including all scans) */
uint32_t acquisitionIndex; //uint32_t acquisitionIndex;
/** number of packets per frame*/ /** number of packets per frame*/
int packetsPerFrame; //int packetsPerFrame;
/** frame index mask */ /** frame index mask */
uint32_t frameIndexMask; //uint32_t frameIndexMask;
/** packet index mask */ /** packet index mask */
uint32_t packetIndexMask; //uint32_t packetIndexMask;
/** frame index offset */ /** frame index offset */
int frameIndexOffset; //int frameIndexOffset;
/** acquisition period */ /** acquisition period */
int64_t acquisitionPeriod; //int64_t acquisitionPeriod;
/** frame number */ /** frame number */
int32_t numberOfFrames; //int32_t numberOfFrames;
/** dynamic range */ /** dynamic range */
int dynamicRange; //int dynamicRange;
/** short frames */ /** short frames */
int shortFrame; //int shortFrame;
/** current frame number */ /** current frame number */
uint32_t currframenum; //uint32_t currframenum;
/** Previous Frame number from buffer */ /** Previous Frame number from buffer */
uint32_t prevframenum; //uint32_t prevframenum;
/** size of one frame */ /** size of one frame */
int frameSize; //int frameSize;
/** buffer size. different from framesize as we wait for one packet instead of frame for eiger */ /** buffer size. different from framesize as we wait for one packet instead of frame for eiger */
int bufferSize; //int bufferSize;
/** oen buffer size */ /** oen buffer size */
int onePacketSize; //int onePacketSize;
/** latest data */ /** latest data */
char* latestData; //char* latestData;
/** gui data ready */ /** gui data ready */
int guiDataReady; //int guiDataReady;
/** points to the data to send to gui */ /** points to the data to send to gui */
char* guiData; //char* guiData;
/** points to the filename to send to gui */ /** points to the filename to send to gui */
char* guiFileName; //char* guiFileName;
/** temporary number for eiger frame number as its not included in the packet */ /** temporary number for eiger frame number as its not included in the packet */
uint32_t guiFrameNumber; //uint32_t guiFrameNumber;
/** send every nth frame to gui or only upon gui request*/ /** send every nth frame to gui or only upon gui request*/
int nFrameToGui; //int nFrameToGui;
/** fifo size */ /** fifo size */
unsigned int fifosize; //unsigned int fifosize;
/** number of jobs per thread for data compression */ /** number of jobs per thread for data compression */
int numJobsPerThread; //int numJobsPerThread;
/** datacompression - save only hits */ /** datacompression - save only hits */
bool dataCompression; //bool dataCompression;
/** memory allocated for the buffer */ /** memory allocated for the buffer */
char *mem0[MAX_NUM_LISTENING_THREADS]; //char *mem0[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data read */ /** circular fifo to store addresses of data read */
CircularFifo<char>* fifo[MAX_NUM_LISTENING_THREADS]; //CircularFifo<char>* fifo[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data already written and ready to be resued*/ /** circular fifo to store addresses of data already written and ready to be resued*/
CircularFifo<char>* fifoFree[MAX_NUM_LISTENING_THREADS]; //CircularFifo<char>* fifoFree[MAX_NUM_LISTENING_THREADS];
/** Receiver buffer */ /** Receiver buffer */
char *buffer[MAX_NUM_LISTENING_THREADS]; //char *buffer[MAX_NUM_LISTENING_THREADS];
/** number of writer threads */ /** number of writer threads */
int numListeningThreads; //intt numListeningThreads;
/** number of writer threads */ /** number of writer threads */
int numWriterThreads; //int numWriterThreads;
/** to know if listening and writer threads created properly */ /** to know if listening and writer threads created properly */
int thread_started; //int thread_started;
/** current listening thread index*/ /** current listening thread index*/
int currentListeningThreadIndex; //int currentListeningThreadIndex;
/** current writer thread index*/ /** current writer thread index*/
int currentWriterThreadIndex; //int currentWriterThreadIndex;
/** thread listening to packets */ /** thread listening to packets */
pthread_t listening_thread[MAX_NUM_LISTENING_THREADS]; //pthread_t listening_thread[MAX_NUM_LISTENING_THREADS];
/** thread writing packets */ /** thread writing packets */
pthread_t writing_thread[MAX_NUM_WRITER_THREADS]; //pthread_t writing_thread[MAX_NUM_WRITER_THREADS];
/** total frame count the listening thread has listened to */ /** total frame count the listening thread has listened to */
int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS]; //int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS];
/** mask showing which listening threads are running */ /** mask showing which listening threads are running */
volatile uint32_t listeningthreads_mask; //volatile uint32_t listeningthreads_mask;
/** mask showing which writer threads are running */ /** mask showing which writer threads are running */
volatile uint32_t writerthreads_mask; //volatile uint32_t writerthreads_mask;
/** mask showing which threads have created files*/ /** mask showing which threads have created files*/
volatile uint32_t createfile_mask; //volatile uint32_t createfile_mask;
/** OK if file created was successful */ /** OK if file created was successful */
int ret_createfile; //int ret_createfile;
/** variable used to self terminate threads waiting for semaphores */ /** variable used to self terminate threads waiting for semaphores */
int killAllListeningThreads; //int killAllListeningThreads;
/** variable used to self terminate threads waiting for semaphores */ /** variable used to self terminate threads waiting for semaphores */
int killAllWritingThreads; //int killAllWritingThreads;
/** 10Gbe enable*/ /** 10Gbe enable*/
int tengigaEnable; //int tengigaEnable;
//semaphores //semaphores
/** semaphore to synchronize writer and guireader threads */ /** semaphore to synchronize writer and guireader threads */
sem_t smp; //sem_t smp;
/** semaphore to synchronize listener threads */ /** semaphore to synchronize listener threads */
sem_t listensmp[MAX_NUM_LISTENING_THREADS]; //sem_t listensmp[MAX_NUM_LISTENING_THREADS];
/** semaphore to synchronize writer threads */ /** semaphore to synchronize writer threads */
sem_t writersmp[MAX_NUM_WRITER_THREADS]; //sem_t writersmp[MAX_NUM_WRITER_THREADS];
//mutex //mutex
/** guiDataReady mutex */ /** guiDataReady mutex */
pthread_mutex_t dataReadyMutex; //pthread_mutex_t dataReadyMutex;
/** mutex for status */ /** mutex for status */
pthread_mutex_t status_mutex; //pthread_mutex_t status_mutex;
/** mutex for progress variable currframenum */ /** mutex for progress variable currframenum */
pthread_mutex_t progress_mutex; //pthread_mutex_t progress_mutex;
/** mutex for writing data to file */ /** mutex for writing data to file */
pthread_mutex_t write_mutex; //pthread_mutex_t write_mutex;
/** File Descriptor */ /** File Descriptor */
FILE *sfilefd; //FILE *sfilefd;
//filter //filter
singlePhotonDetector<uint16_t> *singlePhotonDet[MAX_NUM_WRITER_THREADS]; //singlePhotonDetector<uint16_t> *singlePhotonDet[MAX_NUM_WRITER_THREADS];
slsReceiverData<uint16_t> *receiverdata[MAX_NUM_WRITER_THREADS]; //slsReceiverData<uint16_t> *receiverdata[MAX_NUM_WRITER_THREADS];
moenchCommonMode *cmSub; //moenchCommonMode *cmSub;
bool commonModeSubtractionEnable; //bool commonModeSubtractionEnable;
#ifdef MYROOT1 #ifdef MYROOT1
/** Tree where the hits are stored */ /** Tree where the hits are stored */

View File

@ -212,6 +212,10 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
* Set UDP Port Number * Set UDP Port Number
*/ */
void setUDPPortNo(int p); void setUDPPortNo(int p);
/**
* Set UDP Port Number
*/
void setUDPPortNo2(int p);
/* /*
* Returns number of frames to receive * Returns number of frames to receive
@ -287,8 +291,7 @@ class UDPStandardImplementation: private virtual slsReceiverDefs, public UDPBase
* @param raw address of pointer, pointing to current frame to send to gui * @param raw address of pointer, pointing to current frame to send to gui
* @param fnum frame number for eiger as it is not in the packet * @param fnum frame number for eiger as it is not in the packet
*/ */
void readFrame(char* c,char** raw, uint32_t &fnum); void readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind);
/** /**
* Closes all files * Closes all files
* @param ithr thread index * @param ithr thread index

View File

@ -61,6 +61,8 @@ class sockaddr_in;
#endif #endif
#include <stdlib.h> /******exit */
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include <iostream> #include <iostream>
@ -558,6 +560,16 @@ enum communicationProtocol{
break; break;
case UDP: case UDP:
if (socketDescriptor<0) return -1; if (socketDescriptor<0) return -1;
/*
cout <<"******listening inside genericsocket"<<endl;
for(int i=0;i<10000;i++){
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,5000, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
cout<<i<<":"<<nsent<<"\t\t";
}
exit(-1);
*/
//if length given, listens to length, else listens for packetsize till length is reached //if length given, listens to length, else listens for packetsize till length is reached
if(length){ if(length){
while(length>0){ while(length>0){

View File

@ -1,11 +1,11 @@
//#define SVNPATH "" //#define SVNPATH ""
#define SVNURL "git@gitorious.psi.ch:sls_det_software/sls_receiver_software.git" #define SVNURL "git@gitorious.psi.ch:sls_det_software/sls_receiver_software.git"
//#define SVNREPPATH "" //#define SVNREPPATH ""
#define SVNREPUUID "1c259aeba8b068b9f6e550d63a9a3a14bd7d3ab7" #define SVNREPUUID "e019a6ce7d96d4ac9cb5762b7137245aedb4d5b8"
//#define SVNREV 0x6 //#define SVNREV 0x22
//#define SVNKIND "" //#define SVNKIND ""
//#define SVNSCHED "" //#define SVNSCHED ""
#define SVNAUTH "Maliakal_Dhanya" #define SVNAUTH "Anna_Bergamaschi"
#define SVNREV 0x6 #define SVNREV 0x22
#define SVNDATE 0x20140603 #define SVNDATE 0x20141015
// //

View File

@ -4,11 +4,12 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#define STRINGIFY(x) #x #define STRINGIFY(x) #x
#define TOSTRING(x) STRINGIFY(x) #define TOSTRING(x) STRINGIFY(x)
#define MYCONCAT(x,y) #define MYCONCAT(x,y)
#define __AT__ string(__FILE__) + string("::") + string(__func__) + string("(): ") #define __AT__ string(__FILE__) + string("::") + string(__func__) + string("(): ")
//":" TOSTRING(__LINE__) //":" TOSTRING(__LINE__)

View File

@ -26,9 +26,11 @@ class slsReceiverTCPIPInterface : private virtual slsReceiverDefs {
* @param succecc socket creation was successfull * @param succecc socket creation was successfull
* @param rbase pointer to the receiver base * @param rbase pointer to the receiver base
* @param pn port number (defaults to default port number) * @param pn port number (defaults to default port number)
* @param bot mode is bottom if true, else its a top half module
*/ */
slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn=-1);
slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn=-1, bool bot=false);
/** /**
* Sets the port number to listen to. * Sets the port number to listen to.
Take care that the client must know to whcih port it has to listen to, so normally it is better to use a fixes port from the instatiation or change it from the client. Take care that the client must know to whcih port it has to listen to, so normally it is better to use a fixes port from the instatiation or change it from the client.
@ -272,6 +274,9 @@ private:
/** port number */ /** port number */
int portNumber; int portNumber;
/** true if bottom half module for eiger */
bool bottom;
protected: protected:
/** Socket */ /** Socket */

View File

@ -1,807 +0,0 @@
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
#ifndef SLS_RECEIVER_UDP_FUNCTIONS_H
#define SLS_RECEIVER_UDP_FUNCTIONS_H
/********************************************//**
* @file slsReceiverUDPFunctions.h
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
//#include "sls_receiver_defs.h"
//#include "receiver_defs.h"
//#include "genericSocket.h"
#include "circularFifo.h"
#include "singlePhotonDetector.h"
#include "slsReceiverData.h"
#include "moenchCommonMode.h"
#include "UDPInterface.h"
#include "UDPBaseImplementation.h"
#ifdef MYROOT1
#include <TTree.h>
#include <TFile.h>
#endif
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/**
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
*/
class slsReceiverUDPFunctions : private virtual slsReceiverDefs, public UDPInterface {
public:
/**
* Constructor
*/
slsReceiverUDPFunctions();
/**
* Destructor
*/
virtual ~slsReceiverUDPFunctions();
/**
* delete and free member parameters
*/
void deleteMembers();
/**
* initialize member parameters
*/
void initializeMembers();
/**
* Set receiver type
* @param det detector type
* Returns success or FAIL
*/
int setDetectorType(detectorType det);
//Frame indices and numbers caught
/**
* Returns current Frame Index Caught for an entire acquisition (including all scans)
*/
uint32_t getAcquisitionIndex();
/**
* Returns if acquisition started
*/
bool getAcquistionStarted();
/**
* Returns Frames Caught for each real time acquisition (eg. for each scan)
*/
int getFramesCaught();
/**
* Returns Total Frames Caught for an entire acquisition (including all scans)
*/
int getTotalFramesCaught();
/**
* Returns the frame index at start of each real time acquisition (eg. for each scan)
*/
uint32_t getStartFrameIndex();
/**
* Returns current Frame Index for each real time acquisition (eg. for each scan)
*/
uint32_t getFrameIndex();
/**
* Returns if measurement started
*/
bool getMeasurementStarted();
/**
* Resets the Total Frames Caught
* This is how the receiver differentiates between entire acquisitions
* Returns 0
*/
void resetTotalFramesCaught();
//file parameters
/**
* Returns File Path
*/
char* getFilePath() const;
/**
* Set File Path
* @param c file path
*/
char* setFilePath(const char c[]);
/**
* Returns File Name
*/
char* getFileName() const;
/**
* Set File Name (without frame index, file index and extension)
* @param c file name
*/
char* setFileName(const char c[]);
/**
* Returns File Index
*/
int getFileIndex();
/**
* Set File Index
* @param i file index
*/
int setFileIndex(int i);
/**
* Set Frame Index Needed
* @param i frame index needed
*/
int setFrameIndexNeeded(int i);
/**
* Set enable file write
* @param i file write enable
* Returns file write enable
*/
int setEnableFileWrite(int i);
/**
* Enable/disable overwrite
* @param i enable
* Returns enable over write
*/
int setEnableOverwrite(int i);
/**
* Returns file write enable
* 1: YES 0: NO
*/
int getEnableFileWrite() const;
/**
* Returns file over write enable
* 1: YES 0: NO
*/
int getEnableOverwrite() const;
//other parameters
/**
* abort acquisition with minimum damage: close open files, cleanup.
* does nothing if state already is 'idle'
*/
void abort() {};
/**
* Returns status of receiver: idle, running or error
*/
runStatus getStatus() const;
/**
* Set detector hostname
* @param c hostname
*/
void initialize(const char *detectorHostName);
/* Returns detector hostname
/returns hostname
* caller needs to deallocate the returned char array.
* if uninitialized, it must return NULL
*/
char *getDetectorHostname() const;
/**
* Set Ethernet Interface or IP to listen to
*/
void setEthernetInterface(char* c);
/**
* Set UDP Port Number
*/
void setUDPPortNo(int p);
/*
* Returns number of frames to receive
* This is the number of frames to expect to receiver from the detector.
* The data receiver will change from running to idle when it got this number of frames
*/
int getNumberOfFrames() const;
/**
* set frame number if a positive number
*/
int32_t setNumberOfFrames(int32_t fnum);
/**
* Returns scan tag
*/
int getScanTag() const;
/**
* set scan tag if its is a positive number
*/
int32_t setScanTag(int32_t stag);
/**
* Returns the number of bits per pixel
*/
int getDynamicRange() const;
/**
* set dynamic range if its is a positive number
*/
int32_t setDynamicRange(int32_t dr);
/**
* Set short frame
* @param i if shortframe i=1
*/
int setShortFrame(int i);
/**
* Set the variable to send every nth frame to gui
* or if 0,send frame only upon gui request
*/
int setNFrameToGui(int i);
/** set acquisition period if a positive number
*/
int64_t setAcquisitionPeriod(int64_t index);
/** get data compression, by saving only hits
*/
bool getDataCompression();
/** enabl data compression, by saving only hits
/returns if failed
*/
int enableDataCompression(bool enable);
/**
* enable 10Gbe
@param enable 1 for 10Gbe or 0 for 1 Gbe, -1 to read out
\returns enable for 10Gbe
*/
int enableTenGiga(int enable = -1);
//other functions
/**
* Returns the buffer-current frame read by receiver
* @param c pointer to current file name
* @param raw address of pointer, pointing to current frame to send to gui
* @param fnum frame number for eiger as it is not in the packet
*/
void readFrame(char* c,char** raw, uint32_t &fnum);
/**
* Closes all files
* @param ithr thread index
*/
void closeFile(int ithr = -1);
/**
* Starts Receiver - starts to listen for packets
* @param message is the error message if there is an error
* Returns success
*/
int startReceiver(char message[]);
/**
* Stops Receiver - stops listening for packets
* Returns success
*/
int stopReceiver();
/** set status to transmitting and
* when fifo is empty later, sets status to run_finished
*/
void startReadout();
/**
* shuts down the udp sockets
* \returns if success or fail
*/
int shutDownUDPSockets();
private:
/**
* Deletes all the filter objects for single photon data
*/
void deleteFilter();
/**
* Constructs the filter for single photon data
*/
void setupFilter();
/**
* set up fifo according to the new numjobsperthread
*/
void setupFifoStructure ();
/**
* Copy frames to gui
* uses semaphore for nth frame mode
*/
void copyFrameToGui(char* startbuf[], uint32_t fnum=-1, char* buf=NULL);
/**
* creates udp sockets
* \returns if success or fail
*/
int createUDPSockets();
/**
* create listening thread
* @param destroy is true to kill all threads and start again
*/
int createListeningThreads(bool destroy = false);
/**
* create writer threads
* @param destroy is true to kill all threads and start again
*/
int createWriterThreads(bool destroy = false);
/**
* set thread priorities
*/
void setThreadPriorities();
/**
* initializes variables and creates the first file
* also does the startAcquisitionCallBack
* \returns FAIL or OK
*/
int setupWriter();
/**
* Creates new tree and file for compression
* @param ithr thread number
* @param iframe frame number
*\returns OK for succces or FAIL for failure
*/
int createCompressionFile(int ithr, int iframe);
/**
* Creates new file
*\returns OK for succces or FAIL for failure
*/
int createNewFile();
/**
* Static function - Thread started which listens to packets.
* Called by startReceiver()
* @param this_pointer pointer to this object
*/
static void* startListeningThread(void *this_pointer);
/**
* Static function - Thread started which writes packets to file.
* Called by startReceiver()
* @param this_pointer pointer to this object
*/
static void* startWritingThread(void *this_pointer);
/**
* Thread started which listens to packets.
* Called by startReceiver()
*
*/
int startListening();
/**
* Thread started which writes packets to file.
* Called by startReceiver()
*
*/
int startWriting();
/**
* Writing to file without compression
* @param buf is the address of buffer popped out of fifo
* @param numpackets is the number of packets
* @param framenum current frame number
*/
void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum);
/**
* Its called for the first packet of a scan or acquistion
* Sets the startframeindices and the variables to know if acquisition started
* @param ithread listening thread number
*/
void startFrameIndices(int ithread);
/**
* This is called when udp socket is shut down
* It pops ffff instead of packet number into fifo
* to inform writers about the end of listening session
* @param ithread listening thread number
* @param rc number of bytes received
* @param pc packet count
* @param t total packets listened to
*/
void stopListening(int ithread, int rc, int &pc, int &t);
/**
* When acquisition is over, this is called
* @param ithread listening thread number
* @param wbuffer writer buffer
*/
void stopWriting(int ithread, char* wbuffer[]);
/**
* data compression for each fifo output
* @param ithread listening thread number
* @param wbuffer writer buffer
* @param npackets number of packets from the fifo
* @param data pointer to the next packet start
* @param xmax max pixels in x direction
* @param ymax max pixels in y direction
* @param nf nf
*/
void handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf);
/** structure of an eiger image header*/
typedef struct
{
unsigned char header_before[20];
unsigned char fnum[4];
unsigned char header_after[24];
} eiger_image_header;
/** structure of an eiger image header*/
typedef struct
{
unsigned char num1[4];
unsigned char num2[4];
} eiger_packet_header;
/** max number of listening threads */
const static int MAX_NUM_LISTENING_THREADS = EIGER_MAX_PORTS;
/** max number of writer threads */
const static int MAX_NUM_WRITER_THREADS = 15;
/** detector type */
detectorType myDetectorType;
/** detector hostname */
char detHostname[MAX_STR_LENGTH];
/** status of receiver */
runStatus status;
/** UDP Socket between Receiver and Detector */
genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS];
/** Server UDP Port*/
int server_port[MAX_NUM_LISTENING_THREADS];
/** ethernet interface or IP to listen to */
char *eth;
/** max packets per file **/
int maxPacketsPerFile;
/** File write enable */
int enableFileWrite;
/** File over write enable */
int overwrite;
/** Complete File name */
char savefilename[MAX_STR_LENGTH];
/** File Name without frame index, file index and extension*/
char fileName[MAX_STR_LENGTH];
/** File Path */
char filePath[MAX_STR_LENGTH];
/** File Index */
int fileIndex;
/** scan tag */
int scanTag;
/** if frame index required in file name */
int frameIndexNeeded;
/* Acquisition started */
bool acqStarted;
/* Measurement started */
bool measurementStarted;
/** Frame index at start of each real time acquisition (eg. for each scan) */
uint32_t startFrameIndex;
/** Actual current frame index of each time acquisition (eg. for each scan) */
uint32_t frameIndex;
/** Frames Caught for each real time acquisition (eg. for each scan) */
int packetsCaught;
/** Total packets caught for an entire acquisition (including all scans) */
int totalPacketsCaught;
/** Pckets currently in current file, starts new file when it reaches max */
int packetsInFile;
/** Frame index at start of an entire acquisition (including all scans) */
uint32_t startAcquisitionIndex;
/** Actual current frame index of an entire acquisition (including all scans) */
uint32_t acquisitionIndex;
/** number of packets per frame*/
int packetsPerFrame;
/** frame index mask */
uint32_t frameIndexMask;
/** packet index mask */
uint32_t packetIndexMask;
/** frame index offset */
int frameIndexOffset;
/** acquisition period */
int64_t acquisitionPeriod;
/** frame number */
int32_t numberOfFrames;
/** dynamic range */
int dynamicRange;
/** short frames */
int shortFrame;
/** current frame number */
uint32_t currframenum;
/** Previous Frame number from buffer */
uint32_t prevframenum;
/** size of one frame */
int frameSize;
/** buffer size. different from framesize as we wait for one packet instead of frame for eiger */
int bufferSize;
/** oen buffer size */
int onePacketSize;
/** latest data */
char* latestData;
/** gui data ready */
int guiDataReady;
/** points to the data to send to gui */
char* guiData;
/** points to the filename to send to gui */
char* guiFileName;
/** temporary number for eiger frame number as its not included in the packet */
uint32_t guiFrameNumber;
/** send every nth frame to gui or only upon gui request*/
int nFrameToGui;
/** fifo size */
unsigned int fifosize;
/** number of jobs per thread for data compression */
int numJobsPerThread;
/** datacompression - save only hits */
bool dataCompression;
/** memory allocated for the buffer */
char *mem0[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data read */
CircularFifo<char>* fifo[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data already written and ready to be resued*/
CircularFifo<char>* fifoFree[MAX_NUM_LISTENING_THREADS];
/** Receiver buffer */
char *buffer[MAX_NUM_LISTENING_THREADS];
/** number of writer threads */
int numListeningThreads;
/** number of writer threads */
int numWriterThreads;
/** to know if listening and writer threads created properly */
int thread_started;
/** current listening thread index*/
int currentListeningThreadIndex;
/** current writer thread index*/
int currentWriterThreadIndex;
/** thread listening to packets */
pthread_t listening_thread[MAX_NUM_LISTENING_THREADS];
/** thread writing packets */
pthread_t writing_thread[MAX_NUM_WRITER_THREADS];
/** total frame count the listening thread has listened to */
int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS];
/** mask showing which listening threads are running */
volatile uint32_t listeningthreads_mask;
/** mask showing which writer threads are running */
volatile uint32_t writerthreads_mask;
/** mask showing which threads have created files*/
volatile uint32_t createfile_mask;
/** OK if file created was successful */
int ret_createfile;
/** variable used to self terminate threads waiting for semaphores */
int killAllListeningThreads;
/** variable used to self terminate threads waiting for semaphores */
int killAllWritingThreads;
/** 10Gbe enable*/
int tengigaEnable;
//semaphores
/** semaphore to synchronize writer and guireader threads */
sem_t smp;
/** semaphore to synchronize listener threads */
sem_t listensmp[MAX_NUM_LISTENING_THREADS];
/** semaphore to synchronize writer threads */
sem_t writersmp[MAX_NUM_WRITER_THREADS];
//mutex
/** guiDataReady mutex */
pthread_mutex_t dataReadyMutex;
/** mutex for status */
pthread_mutex_t status_mutex;
/** mutex for progress variable currframenum */
pthread_mutex_t progress_mutex;
/** mutex for writing data to file */
pthread_mutex_t write_mutex;
/** File Descriptor */
FILE *sfilefd;
//filter
singlePhotonDetector<uint16_t> *singlePhotonDet[MAX_NUM_WRITER_THREADS];
slsReceiverData<uint16_t> *receiverdata[MAX_NUM_WRITER_THREADS];
moenchCommonMode *cmSub;
bool commonModeSubtractionEnable;
#ifdef MYROOT1
/** Tree where the hits are stored */
TTree *myTree[MAX_NUM_WRITER_THREADS];
/** File where the tree is saved */
TFile *myFile[MAX_NUM_WRITER_THREADS];
#endif
/**
callback arguments are
filepath
filename
fileindex
data size
return value is
0 callback takes care of open,close,write file
1 callback writes file, we have to open, close it
2 we open, close, write file, callback does not do anything
*/
int (*startAcquisitionCallBack)(char*, char*,int, int, void*);
void *pStartAcquisition;
/**
args to acquisition finished callback
total frames caught
*/
void (*acquisitionFinishedCallBack)(int, void*);
void *pAcquisitionFinished;
/**
args to raw data ready callback are
framenum
datapointer
datasize in bytes
file descriptor
guidatapointer (NULL, no data required)
*/
void (*rawDataReadyCallBack)(int, char*, int, FILE*, char*, void*);
void *pRawDataReady;
/** The action which decides what the user and default responsibilites to save data are
* 0 raw data ready callback takes care of open,close,write file
* 1 callback writes file, we have to open, close it
* 2 we open, close, write file, callback does not do anything */
int cbAction;
public:
/**
callback arguments are
filepath
filename
fileindex
datasize
return value is
0 callback takes care of open,close,wrie file
1 callback writes file, we have to open, close it
2 we open, close, write file, callback does not do anything
*/
void registerCallBackStartAcquisition(int (*func)(char*, char*,int, int, void*),void *arg){startAcquisitionCallBack=func; pStartAcquisition=arg;};
/**
callback argument is
toatal frames caught
*/
void registerCallBackAcquisitionFinished(void (*func)(int, void*),void *arg){acquisitionFinishedCallBack=func; pAcquisitionFinished=arg;};
/**
args to raw data ready callback are
framenum
datapointer
datasize in bytes
file descriptor
guidatapointer (NULL, no data required)
*/
void registerCallBackRawDataReady(void (*func)(int, char*, int, FILE*, char*, void*),void *arg){rawDataReadyCallBack=func; pRawDataReady=arg;};
};
#endif
#endif

View File

@ -55,7 +55,9 @@ public:
GOTTHARD, /**< gotthard */ GOTTHARD, /**< gotthard */
PICASSO, /**< picasso */ PICASSO, /**< picasso */
AGIPD, /**< agipd */ AGIPD, /**< agipd */
MOENCH /**< moench */ MOENCH, /**< moench */
JUNGFRAU, /**< jungfrau */
JUNGFRAUCTB /**< jungfrauCTBversion */
}; };
@ -85,7 +87,9 @@ public:
MEASUREMENT_TIME, /**< Time of the measurement from the detector (fifo) */ MEASUREMENT_TIME, /**< Time of the measurement from the detector (fifo) */
PROGRESS, /**< fraction of measurement elapsed - only get! */ PROGRESS, /**< fraction of measurement elapsed - only get! */
MEASUREMENTS_NUMBER MEASUREMENTS_NUMBER,
FRAMES_FROM_START,
FRAMES_FROM_START_PG
}; };

View File

@ -8,7 +8,7 @@
enum { enum {
//General functions //General functions
F_EXEC_RECEIVER_COMMAND=0, /**< command is executed */ F_EXEC_RECEIVER_COMMAND=128, /**< command is executed */
F_EXIT_RECEIVER, /**< turn off receiver server */ F_EXIT_RECEIVER, /**< turn off receiver server */
F_LOCK_RECEIVER, /**< Locks/Unlocks server communication to the given client */ F_LOCK_RECEIVER, /**< Locks/Unlocks server communication to the given client */
F_GET_LAST_RECEIVER_CLIENT_IP, /**< returns the IP of the client last connected to the receiver */ F_GET_LAST_RECEIVER_CLIENT_IP, /**< returns the IP of the client last connected to the receiver */

View File

@ -29,7 +29,8 @@ using namespace std;
UDPBaseImplementation::UDPBaseImplementation(){} UDPBaseImplementation::UDPBaseImplementation(){
}
UDPBaseImplementation::~UDPBaseImplementation(){} UDPBaseImplementation::~UDPBaseImplementation(){}
@ -52,8 +53,7 @@ void UDPBaseImplementation::initializeMembers(){
int UDPBaseImplementation::setDetectorType(detectorType det){ int UDPBaseImplementation::setDetectorType(detectorType det){
cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl; cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl;
cout << "Setting Receiver Type " << endl;
cout << "Setting Receiver Type " << endl;
deleteMembers(); deleteMembers();
initializeMembers(); initializeMembers();
@ -75,7 +75,7 @@ int UDPBaseImplementation::setDetectorType(detectorType det){
return FAIL; return FAIL;
break; break;
} }
/*
//moench variables //moench variables
if(myDetectorType == GOTTHARD){ if(myDetectorType == GOTTHARD){
fifosize = GOTTHARD_FIFO_SIZE; fifosize = GOTTHARD_FIFO_SIZE;
@ -137,7 +137,7 @@ int UDPBaseImplementation::setDetectorType(detectorType det){
cout << "Ready..." << endl; cout << "Ready..." << endl;
return OK; return OK;
*/
return OK; return OK;
} }
@ -147,17 +147,17 @@ int UDPBaseImplementation::setDetectorType(detectorType det){
/*Frame indices and numbers caught*/ /*Frame indices and numbers caught*/
bool UDPBaseImplementation::getAcquistionStarted(){return acqStarted;}; bool UDPBaseImplementation::getAcquistionStarted(){ FILE_LOG(logDEBUG) << __AT__ << " starting";return acqStarted;};
bool UDPBaseImplementation::getMeasurementStarted(){return measurementStarted;}; bool UDPBaseImplementation::getMeasurementStarted(){ FILE_LOG(logDEBUG) << __AT__ << " starting";return measurementStarted;};
int UDPBaseImplementation::getFramesCaught(){return (packetsCaught/packetsPerFrame);} int UDPBaseImplementation::getFramesCaught(){ FILE_LOG(logDEBUG) << __AT__ << " starting";return (packetsCaught/packetsPerFrame);}
int UDPBaseImplementation::getTotalFramesCaught(){return (totalPacketsCaught/packetsPerFrame);} int UDPBaseImplementation::getTotalFramesCaught(){ FILE_LOG(logDEBUG) << __AT__ << " starting";return (totalPacketsCaught/packetsPerFrame);}
uint32_t UDPBaseImplementation::getStartFrameIndex(){return startFrameIndex;} uint32_t UDPBaseImplementation::getStartFrameIndex(){ FILE_LOG(logDEBUG) << __AT__ << " starting";return startFrameIndex;}
uint32_t UDPBaseImplementation::getFrameIndex(){ uint32_t UDPBaseImplementation::getFrameIndex(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(!packetsCaught) if(!packetsCaught)
frameIndex=-1; frameIndex=-1;
else else
@ -166,7 +166,7 @@ uint32_t UDPBaseImplementation::getFrameIndex(){
} }
uint32_t UDPBaseImplementation::getAcquisitionIndex(){ uint32_t UDPBaseImplementation::getAcquisitionIndex(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(!totalPacketsCaught) if(!totalPacketsCaught)
acquisitionIndex=-1; acquisitionIndex=-1;
else else
@ -175,7 +175,7 @@ uint32_t UDPBaseImplementation::getAcquisitionIndex(){
} }
void UDPBaseImplementation::resetTotalFramesCaught(){ void UDPBaseImplementation::resetTotalFramesCaught(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
acqStarted = false; acqStarted = false;
startAcquisitionIndex = 0; startAcquisitionIndex = 0;
totalPacketsCaught = 0; totalPacketsCaught = 0;
@ -185,10 +185,12 @@ void UDPBaseImplementation::resetTotalFramesCaught(){
/*file parameters*/ /*file parameters*/
char* UDPBaseImplementation::getFilePath() const{ char* UDPBaseImplementation::getFilePath() const{
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
return (char*)filePath; return (char*)filePath;
} }
inline char* UDPBaseImplementation::setFilePath(const char c[]){ inline char* UDPBaseImplementation::setFilePath(const char c[]){ FILE_LOG(logDEBUG) << __AT__ << " starting";
FILE_LOG(logDEBUG) << __AT__ << "called"; FILE_LOG(logDEBUG) << __AT__ << "called";
if(strlen(c)){ if(strlen(c)){
//check if filepath exists //check if filepath exists
@ -207,10 +209,12 @@ inline char* UDPBaseImplementation::setFilePath(const char c[]){
char* UDPBaseImplementation::getFileName() const{ char* UDPBaseImplementation::getFileName() const{
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
return (char*)fileName; return (char*)fileName;
} }
inline char* UDPBaseImplementation::setFileName(const char c[]){ inline char* UDPBaseImplementation::setFileName(const char c[]){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; //cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl;
if(strlen(c)) if(strlen(c))
@ -220,11 +224,11 @@ inline char* UDPBaseImplementation::setFileName(const char c[]){
} }
int UDPBaseImplementation::getFileIndex(){ int UDPBaseImplementation::getFileIndex(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileIndex; return fileIndex;
} }
int UDPBaseImplementation::setFileIndex(int i){ int UDPBaseImplementation::setFileIndex(int i){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; //cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl;
if(i>=0) if(i>=0)
fileIndex = i; fileIndex = i;
@ -232,7 +236,7 @@ int UDPBaseImplementation::setFileIndex(int i){
} }
int UDPBaseImplementation::setFrameIndexNeeded(int i){ int UDPBaseImplementation::setFrameIndexNeeded(int i){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl; //cout << "[WARNING] This is a base implementation, " << __func__ << " could have no effects." << endl;
frameIndexNeeded = i; frameIndexNeeded = i;
return frameIndexNeeded; return frameIndexNeeded;
@ -240,19 +244,23 @@ int UDPBaseImplementation::setFrameIndexNeeded(int i){
int UDPBaseImplementation::getEnableFileWrite() const{ int UDPBaseImplementation::getEnableFileWrite() const{
return enableFileWrite; FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
return enableFileWrite;
} }
int UDPBaseImplementation::setEnableFileWrite(int i){ int UDPBaseImplementation::setEnableFileWrite(int i){ FILE_LOG(logDEBUG) << __AT__ << " starting";
enableFileWrite=i; enableFileWrite=i;
return getEnableFileWrite(); return getEnableFileWrite();
} }
int UDPBaseImplementation::getEnableOverwrite() const{ int UDPBaseImplementation::getEnableOverwrite() const{
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
return overwrite; return overwrite;
} }
int UDPBaseImplementation::setEnableOverwrite(int i){ int UDPBaseImplementation::setEnableOverwrite(int i){ FILE_LOG(logDEBUG) << __AT__ << " starting";
overwrite=i; overwrite=i;
return getEnableOverwrite(); return getEnableOverwrite();
} }
@ -268,7 +276,7 @@ slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{
} }
void UDPBaseImplementation::initialize(const char *detectorHostName){ void UDPBaseImplementation::initialize(const char *detectorHostName){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(strlen(detectorHostName)) if(strlen(detectorHostName))
strcpy(detHostname,detectorHostName); strcpy(detHostname,detectorHostName);
} }
@ -278,15 +286,18 @@ char *UDPBaseImplementation::getDetectorHostname() const{
return (char*)detHostname; return (char*)detHostname;
} }
void UDPBaseImplementation::setEthernetInterface(char* c){ void UDPBaseImplementation::setEthernetInterface(char* c){ FILE_LOG(logDEBUG) << __AT__ << " starting";
strcpy(eth,c); strcpy(eth,c);
} }
void UDPBaseImplementation::setUDPPortNo(int p){ void UDPBaseImplementation::setUDPPortNo(int p){
for(int i=0;i<numListeningThreads;i++){ server_port[0] = p;
server_port[i] = p+i; }
}
void UDPBaseImplementation::setUDPPortNo2(int p){
server_port[1] = p;
} }
@ -295,7 +306,7 @@ int UDPBaseImplementation::getNumberOfFrames() const {
} }
int32_t UDPBaseImplementation::setNumberOfFrames(int32_t fnum){ int32_t UDPBaseImplementation::setNumberOfFrames(int32_t fnum){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(fnum >= 0) if(fnum >= 0)
numberOfFrames = fnum; numberOfFrames = fnum;
@ -308,7 +319,7 @@ int UDPBaseImplementation::getScanTag() const{
} }
int32_t UDPBaseImplementation::setScanTag(int32_t stag){ int32_t UDPBaseImplementation::setScanTag(int32_t stag){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(stag >= 0) if(stag >= 0)
scanTag = stag; scanTag = stag;
@ -320,7 +331,7 @@ int UDPBaseImplementation::getDynamicRange() const{
return dynamicRange; return dynamicRange;
} }
int32_t UDPBaseImplementation::setDynamicRange(int32_t dr){ int32_t UDPBaseImplementation::setDynamicRange(int32_t dr){ FILE_LOG(logDEBUG) << __AT__ << " starting";
cout << "Setting Dynamic Range" << endl; cout << "Setting Dynamic Range" << endl;
int olddr = dynamicRange; int olddr = dynamicRange;
@ -380,7 +391,7 @@ int32_t UDPBaseImplementation::setDynamicRange(int32_t dr){
int UDPBaseImplementation::setShortFrame(int i){ int UDPBaseImplementation::setShortFrame(int i){ FILE_LOG(logDEBUG) << __AT__ << " starting";
shortFrame=i; shortFrame=i;
if(shortFrame!=-1){ if(shortFrame!=-1){
@ -410,7 +421,7 @@ int UDPBaseImplementation::setShortFrame(int i){
} }
int UDPBaseImplementation::setNFrameToGui(int i){ int UDPBaseImplementation::setNFrameToGui(int i){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(i>=0){ if(i>=0){
nFrameToGui = i; nFrameToGui = i;
setupFifoStructure(); setupFifoStructure();
@ -420,7 +431,7 @@ int UDPBaseImplementation::setNFrameToGui(int i){
int64_t UDPBaseImplementation::setAcquisitionPeriod(int64_t index){ int64_t UDPBaseImplementation::setAcquisitionPeriod(int64_t index){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if(index >= 0){ if(index >= 0){
if(index != acquisitionPeriod){ if(index != acquisitionPeriod){
@ -432,9 +443,9 @@ int64_t UDPBaseImplementation::setAcquisitionPeriod(int64_t index){
} }
bool UDPBaseImplementation::getDataCompression(){return dataCompression;} bool UDPBaseImplementation::getDataCompression(){ FILE_LOG(logDEBUG) << __AT__ << " starting";return dataCompression;}
int UDPBaseImplementation::enableDataCompression(bool enable){ int UDPBaseImplementation::enableDataCompression(bool enable){ FILE_LOG(logDEBUG) << __AT__ << " starting";
cout << "Data compression "; cout << "Data compression ";
if(enable) if(enable)
cout << "enabled" << endl; cout << "enabled" << endl;
@ -487,7 +498,7 @@ int UDPBaseImplementation::enableDataCompression(bool enable){
/*other functions*/ /*other functions*/
void UDPBaseImplementation::deleteFilter(){ void UDPBaseImplementation::deleteFilter(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i; int i;
cmSub=NULL; cmSub=NULL;
@ -504,7 +515,7 @@ void UDPBaseImplementation::deleteFilter(){
} }
void UDPBaseImplementation::setupFilter(){ void UDPBaseImplementation::setupFilter(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
double hc = 0; double hc = 0;
double sigma = 5; double sigma = 5;
int sign = 1; int sign = 1;
@ -540,7 +551,7 @@ void UDPBaseImplementation::setupFilter(){
//LEO: it is not clear to me.. //LEO: it is not clear to me..
void UDPBaseImplementation::setupFifoStructure(){ void UDPBaseImplementation::setupFifoStructure(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int64_t i; int64_t i;
int oldn = numJobsPerThread; int oldn = numJobsPerThread;
@ -628,42 +639,44 @@ void UDPBaseImplementation::setupFifoStructure(){
/** acquisition functions */ /** acquisition functions */
void UDPBaseImplementation::readFrame(char* c,char** raw, uint32_t &fnum, uint32_t& fstartind){
void UDPBaseImplementation::readFrame(char* c,char** raw, uint32_t &fnum){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//point to gui data
if (guiData == NULL) //point to gui data
guiData = latestData; if (guiData == NULL)
guiData = latestData;
//copy data and filename
strcpy(c,guiFileName); //copy data and filename
fnum = guiFrameNumber; strcpy(c,guiFileName);
fnum = guiFrameNumber;
fstartind = getStartFrameIndex();
//could not get gui data
if(!guiDataReady){ //could not get gui data
*raw = NULL; if(!guiDataReady){
} *raw = NULL;
//data ready, set guidata to receive new data }
else{ //data ready, set guidata to receive new data
*raw = guiData; else{
guiData = NULL; *raw = guiData;
guiData = NULL;
pthread_mutex_lock(&dataReadyMutex);
guiDataReady = 0; pthread_mutex_lock(&dataReadyMutex);
pthread_mutex_unlock(&dataReadyMutex); guiDataReady = 0;
if((nFrameToGui) && (writerthreads_mask)){ pthread_mutex_unlock(&dataReadyMutex);
/*if(nFrameToGui){*/ if((nFrameToGui) && (writerthreads_mask)){
//release after getting data /*if(nFrameToGui){*/
sem_post(&smp); //release after getting data
} sem_post(&smp);
} }
}
} }
void UDPBaseImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){
void UDPBaseImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char* buf){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//random read when gui not ready //random read when gui not ready
if((!nFrameToGui) && (!guiData)){ if((!nFrameToGui) && (!guiData)){
@ -718,9 +731,7 @@ void UDPBaseImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum, char
int UDPBaseImplementation::createUDPSockets(){ int UDPBaseImplementation::createUDPSockets(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
//if eth is mistaken with ip address //if eth is mistaken with ip address
if (strchr(eth,'.')!=NULL) if (strchr(eth,'.')!=NULL)
@ -764,7 +775,7 @@ int UDPBaseImplementation::createUDPSockets(){
int UDPBaseImplementation::shutDownUDPSockets(){ int UDPBaseImplementation::shutDownUDPSockets(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
if(udpSocket[i]){ if(udpSocket[i]){
udpSocket[i]->ShutDownSocket(); udpSocket[i]->ShutDownSocket();
@ -779,7 +790,7 @@ int UDPBaseImplementation::shutDownUDPSockets(){
int UDPBaseImplementation::createListeningThreads(bool destroy){ int UDPBaseImplementation::createListeningThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i; int i;
void* status; void* status;
@ -834,7 +845,7 @@ int UDPBaseImplementation::createListeningThreads(bool destroy){
int UDPBaseImplementation::createWriterThreads(bool destroy){ int UDPBaseImplementation::createWriterThreads(bool destroy){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i; int i;
void* status; void* status;
@ -895,7 +906,7 @@ int UDPBaseImplementation::createWriterThreads(bool destroy){
void UDPBaseImplementation::setThreadPriorities(){ void UDPBaseImplementation::setThreadPriorities(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i; int i;
//assign priorities //assign priorities
struct sched_param tcp_param, listen_param, write_param; struct sched_param tcp_param, listen_param, write_param;
@ -932,7 +943,7 @@ void UDPBaseImplementation::setThreadPriorities(){
int UDPBaseImplementation::setupWriter(){ int UDPBaseImplementation::setupWriter(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//reset writing thread variables //reset writing thread variables
packetsInFile=0; packetsInFile=0;
@ -1017,7 +1028,7 @@ int UDPBaseImplementation::setupWriter(){
int UDPBaseImplementation::createCompressionFile(int ithr, int iframe){ int UDPBaseImplementation::createCompressionFile(int ithr, int iframe){ FILE_LOG(logDEBUG) << __AT__ << " starting";
#ifdef MYROOT1 #ifdef MYROOT1
char temp[MAX_STR_LENGTH]; char temp[MAX_STR_LENGTH];
//create file name for gui purposes, and set up acquistion parameters //create file name for gui purposes, and set up acquistion parameters
@ -1045,7 +1056,7 @@ int UDPBaseImplementation::createCompressionFile(int ithr, int iframe){
int UDPBaseImplementation::createNewFile(){ int UDPBaseImplementation::createNewFile(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl; cout << "[WARNING] This is a base implementation, " << __func__ << " not correctly implemented" << endl;
@ -1112,7 +1123,7 @@ void UDPBaseImplementation::closeFile(int ithr)
{ {
FILE_LOG(logDEBUG) << __AT__ << "called"; FILE_LOG(logDEBUG) << __AT__ << "called";
/*
if(!dataCompression){ if(!dataCompression){
if(sfilefd){ if(sfilefd){
#ifdef VERBOSE #ifdef VERBOSE
@ -1159,7 +1170,7 @@ void UDPBaseImplementation::closeFile(int ithr)
#endif #endif
} }
*/
FILE_LOG(logDEBUG) << __AT__ << "exited"; FILE_LOG(logDEBUG) << __AT__ << "exited";
} }
@ -1168,7 +1179,7 @@ void UDPBaseImplementation::closeFile(int ithr)
int UDPBaseImplementation::startReceiver(char message[]){ int UDPBaseImplementation::startReceiver(char message[]){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i; int i;
@ -1236,7 +1247,7 @@ int UDPBaseImplementation::startReceiver(char message[]){
int UDPBaseImplementation::stopReceiver(){ int UDPBaseImplementation::stopReceiver(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//#ifdef VERBOSE //#ifdef VERBOSE
@ -1266,7 +1277,7 @@ int UDPBaseImplementation::stopReceiver(){
void UDPBaseImplementation::startReadout(){ void UDPBaseImplementation::startReadout(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
//#ifdef VERBOSE //#ifdef VERBOSE
cout << "Start Receiver Readout" << endl; cout << "Start Receiver Readout" << endl;
@ -1289,7 +1300,7 @@ void UDPBaseImplementation::startReadout(){
void* UDPBaseImplementation::startListeningThread(void* this_pointer){ void* UDPBaseImplementation::startListeningThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " starting";
((UDPBaseImplementation*)this_pointer)->startListening(); ((UDPBaseImplementation*)this_pointer)->startListening();
return this_pointer; return this_pointer;
@ -1297,7 +1308,7 @@ void* UDPBaseImplementation::startListeningThread(void* this_pointer){
void* UDPBaseImplementation::startWritingThread(void* this_pointer){ void* UDPBaseImplementation::startWritingThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " starting";
((UDPBaseImplementation*)this_pointer)->startWriting(); ((UDPBaseImplementation*)this_pointer)->startWriting();
return this_pointer; return this_pointer;
} }
@ -1307,7 +1318,7 @@ void* UDPBaseImplementation::startWritingThread(void* this_pointer){
int UDPBaseImplementation::startListening(){ int UDPBaseImplementation::startListening(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int ithread = currentListeningThreadIndex; int ithread = currentListeningThreadIndex;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "In startListening() " << endl; cout << "In startListening() " << endl;
@ -1508,7 +1519,7 @@ int UDPBaseImplementation::startListening(){
int UDPBaseImplementation::startWriting(){ int UDPBaseImplementation::startWriting(){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int ithread = currentWriterThreadIndex; int ithread = currentWriterThreadIndex;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << ithread << "In startWriting()" <<endl; cout << ithread << "In startWriting()" <<endl;
@ -1701,7 +1712,7 @@ int loop;
void UDPBaseImplementation::startFrameIndices(int ithread){ void UDPBaseImplementation::startFrameIndices(int ithread){ FILE_LOG(logDEBUG) << __AT__ << " starting";
if (myDetectorType == EIGER) if (myDetectorType == EIGER)
//add currframenum later in this method for scans //add currframenum later in this method for scans
@ -1735,7 +1746,7 @@ void UDPBaseImplementation::startFrameIndices(int ithread){
void UDPBaseImplementation::stopListening(int ithread, int rc, int &pc, int &t){ void UDPBaseImplementation::stopListening(int ithread, int rc, int &pc, int &t){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i; int i;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
@ -1813,7 +1824,7 @@ int i;
void UDPBaseImplementation::stopWriting(int ithread, char* wbuffer[]){ void UDPBaseImplementation::stopWriting(int ithread, char* wbuffer[]){ FILE_LOG(logDEBUG) << __AT__ << " starting";
int i,j; int i,j;
#ifdef VERYDEBUG #ifdef VERYDEBUG
cout << ithread << " **********************popped last dummy frame:" << (void*)wbuffer[wIndex] << endl; cout << ithread << " **********************popped last dummy frame:" << (void*)wbuffer[wIndex] << endl;
@ -1881,6 +1892,10 @@ void UDPBaseImplementation::stopWriting(int ithread, char* wbuffer[]){
void UDPBaseImplementation::writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum){ void UDPBaseImplementation::writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum){
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
int packetsToSave, offset,lastpacket; int packetsToSave, offset,lastpacket;
uint32_t tempframenum = framenum; uint32_t tempframenum = framenum;
@ -1983,6 +1998,8 @@ void UDPBaseImplementation::writeToFile_withoutCompression(char* buf,int numpack
void UDPBaseImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){ void UDPBaseImplementation::handleDataCompression(int ithread, char* wbuffer[], int &npackets, char* data, int xmax, int ymax, int &nf){
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
#if defined(MYROOT1) && defined(ALLFILE_DEBUG) #if defined(MYROOT1) && defined(ALLFILE_DEBUG)
writeToFile_withoutCompression(wbuf[0], numpackets,currframenum); writeToFile_withoutCompression(wbuf[0], numpackets,currframenum);
@ -2082,6 +2099,8 @@ void UDPBaseImplementation::handleDataCompression(int ithread, char* wbuffer[],
int UDPBaseImplementation::enableTenGiga(int enable){ int UDPBaseImplementation::enableTenGiga(int enable){
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
cout << "Enabling 10Gbe to" << enable << endl; cout << "Enabling 10Gbe to" << enable << endl;

View File

@ -19,6 +19,7 @@ using namespace std;
using namespace std; using namespace std;
// TODO: I do not really like passing a bottom-top boolean to the constructor...
UDPInterface * UDPInterface::create(string receiver_type){ UDPInterface * UDPInterface::create(string receiver_type){
if (receiver_type == "standard"){ if (receiver_type == "standard"){

View File

@ -36,8 +36,9 @@ using namespace std;
UDPRESTImplementation::UDPRESTImplementation(){ UDPRESTImplementation::UDPRESTImplementation(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << "PID: " << getpid() << __AT__ << " called";
//TODO I do not really know what to do with bottom...
// Default values // Default values
rest_hostname = "localhost"; rest_hostname = "localhost";
rest_port = 8081; rest_port = 8081;
@ -73,6 +74,17 @@ void UDPRESTImplementation::configure(map<string, string> config_map){
}; };
int UDPRESTImplementation::get_rest_state(RestHelper * rest, string *rest_state){
JsonBox::Value answer;
//string rest_state = "";
int code = rest->get_json("state", &answer);
if ( code != -1 ){
(*rest_state) = answer["state"].getString();
}
return code;
};
void UDPRESTImplementation::initialize_REST(){ void UDPRESTImplementation::initialize_REST(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
@ -91,7 +103,7 @@ void UDPRESTImplementation::initialize_REST(){
int code; int code;
try{ try{
rest->init(rest_hostname, rest_port); rest->init(rest_hostname, rest_port);
code = rest->get_json("state", &answer); code = get_rest_state(rest, &answer);
if (code != 0){ if (code != 0){
throw answer; throw answer;
@ -115,7 +127,7 @@ void UDPRESTImplementation::initialize_REST(){
stringstream ss; stringstream ss;
string test; string test;
std::cout << "GetSTring: " << json_request << std::endl; //std::cout << "GetSTring: " << json_request << std::endl;
json_request.writeToStream(ss, false); json_request.writeToStream(ss, false);
//ss << json_request; //ss << json_request;
ss >> test; ss >> test;
@ -125,7 +137,7 @@ void UDPRESTImplementation::initialize_REST(){
code = rest->post_json("state/initialize", &answer, test); code = rest->post_json("state/initialize", &answer, test);
FILE_LOG(logDEBUG) << __AT__ << "state/configure got " << code; FILE_LOG(logDEBUG) << __AT__ << "state/configure got " << code;
code = rest->get_json("state", &answer); code = rest->get_json("state", &answer);
FILE_LOG(logDEBUG) << __AT__ << "state got " << code << " " << answer; FILE_LOG(logDEBUG) << __AT__ << "state got " << code << " " << answer << "\n";
/* /*
@ -283,13 +295,22 @@ void UDPRESTImplementation::setEthernetInterface(char* c){
//FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " done"; //FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " done";
} }
/*
void UDPRESTImplementation::setUDPPortNo(int p){ void UDPRESTImplementation::setUDPPortNo(int p){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
server_port[i] = p+i; server_port[i] = p+i;
} }
} }
*/
void UDPRESTImplementation::setUDPPortNo(int p){
server_port[0] = p;
}
void UDPRESTImplementation::setUDPPortNo2(int p){
server_port[1] = p;
}
/* /*
int UDPRESTImplementation::getNumberOfFrames() const { int UDPRESTImplementation::getNumberOfFrames() const {
@ -333,7 +354,13 @@ int32_t UDPRESTImplementation::setDynamicRange(int32_t dr){
} }
/*
int32_t UDPRESTImplementation::getDynamicRange() const{
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting";
return dynamicRange;
}
*/
int UDPRESTImplementation::setShortFrame(int i){ int UDPRESTImplementation::setShortFrame(int i){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
@ -550,8 +577,7 @@ void UDPRESTImplementation::setupFifoStructure(){
/** acquisition functions */ /** acquisition functions */
void UDPRESTImplementation::readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind){
void UDPRESTImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data //point to gui data
if (guiData == NULL) if (guiData == NULL)
@ -560,7 +586,7 @@ void UDPRESTImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
//copy data and filename //copy data and filename
strcpy(c,guiFileName); strcpy(c,guiFileName);
fnum = guiFrameNumber; fnum = guiFrameNumber;
fstartind = getStartFrameIndex();
//could not get gui data //could not get gui data
if(!guiDataReady){ if(!guiDataReady){
@ -692,35 +718,54 @@ int UDPRESTImplementation::shutDownUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << "called"; FILE_LOG(logDEBUG) << __AT__ << "called";
std::string answer; // this is just to be sure, it could be removed
int code = rest->get_json("state", &answer);
std::cout << answer << std::endl;
code = rest->post_json("state/close", &answer);
std::cout << answer << std::endl;
code = rest->post_json("state/reset", &answer);
std::cout << answer << std::endl;
code = rest->get_json("state", &answer);
std::cout << answer << std::endl;
status = slsReceiverDefs::RUN_FINISHED;
/*
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
if(udpSocket[i]){ if(udpSocket[i]){
FILE_LOG(logDEBUG) << __AT__ << " UDP socket " << i; FILE_LOG(logDEBUG) << __AT__ << " closing UDP socket #" << i;
udpSocket[i]->ShutDownSocket(); udpSocket[i]->ShutDownSocket();
delete udpSocket[i]; delete udpSocket[i];
udpSocket[i] = NULL; udpSocket[i] = NULL;
} }
} }
*/
JsonBox::Value answer;
int code;
string be_state = "";
FILE_LOG(logDEBUG) << __AT__ << " numListeningThreads=" << numListeningThreads;
if (rest == NULL){
FILE_LOG(logWARNING) << __AT__ << "No REST object initialized, closing...";
return OK;
}
// getting the state
FILE_LOG(logWARNING) << "PLEASE WAIT WHILE CHECKING AND SHUTTING DOWN ALL CONNECTIONS!";
code = rest->get_json("state", &answer);
be_state = answer["state"].getString();
// LEO: this is probably wrong
if (be_state == "OPEN"){
while (be_state != "TRANSIENT"){
code = rest->get_json("state", &answer);
be_state = answer["state"].getString();
cout << "be_State: " << be_state << endl;
usleep(10000);
}
code = rest->post_json("state/close", &answer);
std::cout <<code << " " << answer << std::endl;
code = rest->post_json("state/reset", &answer);
std::cout << code << " " << answer << std::endl;
code = rest->get_json("state", &answer);
std::cout << code << " " << answer << std::endl;
}
status = slsReceiverDefs::RUN_FINISHED;
//LEO: not sure it's needed
delete rest;
FILE_LOG(logDEBUG) << __AT__ << "finished"; FILE_LOG(logDEBUG) << __AT__ << "finished";
return OK; return OK;
} }
@ -1130,18 +1175,24 @@ int UDPRESTImplementation::startReceiver(char message[]){
std::string answer; std::string answer;
int code; int code;
//char *intStr = itoa(a);
//string str = string(intStr);
// TODO: remove hardcode!!!
stringstream ss;
ss << getDynamicRange();
string str_dr = ss.str();
stringstream ss2;
ss2 << getNumberOfFrames();
string str_n = ss2.str();
//test = "{\"configfile\":\"config.pu\", \"path\":\"patto\"}"; std::string request_body = "{\"settings\": {\"bit_depth\": " + str_dr + ", \"nimages\": " + str_n + "}}";
code = rest->post_json("state/configure", &answer); //std::string request_body = "{\"settings\": {\"nimages\":1, \"scanid\":999, \"bit_depth\":16}}";
std::cout << answer << std::endl; FILE_LOG(logDEBUG) << __FILE__ << "::" << " sending this configuration body: " << request_body;
code = rest->post_json("state/configure", &answer, request_body);
code = rest->get_json("state", &answer); code = rest->get_json("state", &answer);
std::cout << answer << std::endl;
code = rest->post_json("state/open", &answer); code = rest->post_json("state/open", &answer);
std::cout << answer << std::endl;
code = rest->get_json("state", &answer); code = rest->get_json("state", &answer);
std::cout << answer << std::endl;
status = slsReceiverDefs::RUNNING; status = slsReceiverDefs::RUNNING;
@ -1234,17 +1285,21 @@ int UDPRESTImplementation::stopReceiver(){
void UDPRESTImplementation::startReadout(){ void UDPRESTImplementation::startReadout(){
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting"; FILE_LOG(logDEBUG) << __AT__ << " starting";
//wait so that all packets which take time has arrived //wait so that all packets which take time has arrived
usleep(50000); usleep(50000);
status = TRANSMITTING;
/********************************************/ /********************************************/
/*
usleep(2000000); usleep(2000000);
pthread_mutex_lock(&status_mutex); pthread_mutex_lock(&status_mutex);
status = TRANSMITTING; status = TRANSMITTING;
pthread_mutex_unlock(&status_mutex); pthread_mutex_unlock(&status_mutex);
cout << "Status: Transmitting" << endl; cout << "Status: Transmitting" << endl;
*/
//kill udp socket to tell the listening thread to push last packet //kill udp socket to tell the listening thread to push last packet
shutDownUDPSockets(); shutDownUDPSockets();
@ -1256,8 +1311,9 @@ void UDPRESTImplementation::startReadout(){
void* UDPRESTImplementation::startListeningThread(void* this_pointer){ void* UDPRESTImplementation::startListeningThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
((UDPRESTImplementation*)this_pointer)->startListening(); //((UDPRESTImplementation*)this_pointer)->startListening();
return this_pointer; return this_pointer;
} }
@ -1266,7 +1322,9 @@ void* UDPRESTImplementation::startListeningThread(void* this_pointer){
void* UDPRESTImplementation::startWritingThread(void* this_pointer){ void* UDPRESTImplementation::startWritingThread(void* this_pointer){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
((UDPRESTImplementation*)this_pointer)->startWriting(); FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
//((UDPRESTImplementation*)this_pointer)->startWriting();
return this_pointer; return this_pointer;
} }
@ -1277,7 +1335,9 @@ void* UDPRESTImplementation::startWritingThread(void* this_pointer){
int UDPRESTImplementation::startListening(){ int UDPRESTImplementation::startListening(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
/*
int ithread = currentListeningThreadIndex; int ithread = currentListeningThreadIndex;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "In startListening() " << endl; cout << "In startListening() " << endl;
@ -1462,7 +1522,7 @@ int UDPRESTImplementation::startListening(){
pthread_exit(NULL); pthread_exit(NULL);
} }
} }
*/
return OK; return OK;
} }
@ -1480,6 +1540,8 @@ int UDPRESTImplementation::startListening(){
int UDPRESTImplementation::startWriting(){ int UDPRESTImplementation::startWriting(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __AT__ << " doing a big bunch of nothing";
/*
int ithread = currentWriterThreadIndex; int ithread = currentWriterThreadIndex;
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << ithread << "In startWriting()" <<endl; cout << ithread << "In startWriting()" <<endl;
@ -1665,7 +1727,7 @@ int loop;
#endif #endif
} }
*/
return OK; return OK;
} }
@ -1706,9 +1768,9 @@ void UDPRESTImplementation::startFrameIndices(int ithread){
} }
void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){ //void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){
FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing"; // FILE_LOG(logDEBUG) << __AT__ << " called, doing nothing";
}; //};
/* /*
void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){ void UDPRESTImplementation::stopListening(int ithread, int rc, int &pc, int &t){

View File

@ -24,10 +24,73 @@
#include <string.h> #include <string.h>
#include <iostream> #include <iostream>
using namespace std; using namespace std;
#define EIGER_32BIT_INITIAL_CONSTANT 0x17c
UDPStandardImplementation::UDPStandardImplementation()
//:
//thread_started(0),
//eth(NULL),
//latestData(NULL),
//guiFileName(NULL),
//guiFrameNumber(0),
//tengigaEnable(0)
{
thread_started = 0;
eth = NULL;
latestData = NULL;
guiFileName = NULL;
guiFrameNumber = NULL;
tengigaEnable = 0;
for(int i=0;i<MAX_NUM_LISTENING_THREADS;i++){
udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
}
for(int i=0;i<MAX_NUM_WRITER_THREADS;i++){
singlePhotonDet[i] = NULL;
receiverdata[i] = NULL;
}
startAcquisitionCallBack = NULL;
pStartAcquisition = NULL;
acquisitionFinishedCallBack = NULL;
pAcquisitionFinished = NULL;
rawDataReadyCallBack = NULL;
pRawDataReady = NULL;
initializeMembers();
//mutex
pthread_mutex_init(&dataReadyMutex,NULL);
pthread_mutex_init(&status_mutex,NULL);
pthread_mutex_init(&progress_mutex,NULL);
pthread_mutex_init(&write_mutex,NULL);
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000
sysctl -p
// from the manual
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000
*/
}
void UDPStandardImplementation::initializeMembers(){ void UDPStandardImplementation::initializeMembers(){
myDetectorType = GENERIC; myDetectorType = GENERIC;
maxPacketsPerFile = 0; maxPacketsPerFile = 0;
@ -125,63 +188,6 @@ void UDPStandardImplementation::initializeMembers(){
} }
UDPStandardImplementation::UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG) << __FILE__ << "::" << __func__ << " starting" ;
thread_started = 0;
eth = NULL;
latestData = NULL;
guiFileName = NULL;
guiFrameNumber = 0;
tengigaEnable = 0;
for(int i=0;i<MAX_NUM_LISTENING_THREADS;i++){
udpSocket[i] = NULL;
server_port[i] = DEFAULT_UDP_PORTNO+i;
mem0[i] = NULL;
fifo[i] = NULL;
fifoFree[i] = NULL;
}
for(int i=0;i<MAX_NUM_WRITER_THREADS;i++){
singlePhotonDet[i] = NULL;
receiverdata[i] = NULL;
}
startAcquisitionCallBack = NULL;
pStartAcquisition = NULL;
acquisitionFinishedCallBack = NULL;
pAcquisitionFinished = NULL;
rawDataReadyCallBack = NULL;
pRawDataReady = NULL;
UDPStandardImplementation::initializeMembers();
//mutex
pthread_mutex_init(&dataReadyMutex,NULL);
pthread_mutex_init(&status_mutex,NULL);
pthread_mutex_init(&progress_mutex,NULL);
pthread_mutex_init(&write_mutex,NULL);
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if(system("echo $((100*1024*1024)) > /proc/sys/net/core/rmem_max"))
cout << "\nWARNING: Could not change socket receiver buffer size in file /proc/sys/net/core/rmem_max" << endl;
else if(system("echo 250000 > /proc/sys/net/core/netdev_max_backlog"))
cout << "\nWARNING: Could not change max length of input queue in file /proc/sys/net/core/netdev_max_backlog" << endl;
/** permanent setting heiner
net.core.rmem_max = 104857600 # 100MiB
net.core.netdev_max_backlog = 250000
sysctl -p
// from the manual
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.netdev_max_backlog=250000
*/
}
UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called"; UDPStandardImplementation::~UDPStandardImplementation(){ FILE_LOG(logDEBUG) << __AT__ << " called";
@ -478,11 +484,15 @@ void UDPStandardImplementation::setEthernetInterface(char* c){ FILE_LOG(logDEBU
} }
void UDPStandardImplementation::setUDPPortNo(int p){ FILE_LOG(logDEBUG) << __AT__ << " called"; void UDPStandardImplementation::setUDPPortNo(int p){
FILE_LOG(logDEBUG) << __AT__ << " called";
server_port[0] = p;
}
for(int i=0;i<numListeningThreads;i++){
server_port[i] = p+i; void UDPStandardImplementation::setUDPPortNo2(int p){
} FILE_LOG(logDEBUG) << __AT__ << " called";
server_port[1] = p;
} }
@ -835,10 +845,8 @@ void UDPStandardImplementation::setupFifoStructure(){
/** acquisition functions */ /** acquisition functions */
void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum, uint32_t &fstartind){
void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
//point to gui data //point to gui data
if (guiData == NULL) if (guiData == NULL)
guiData = latestData; guiData = latestData;
@ -846,7 +854,7 @@ void UDPStandardImplementation::readFrame(char* c,char** raw, uint32_t &fnum){
//copy data and filename //copy data and filename
strcpy(c,guiFileName); strcpy(c,guiFileName);
fnum = guiFrameNumber; fnum = guiFrameNumber;
fstartind = getStartFrameIndex();
//could not get gui data //could not get gui data
if(!guiDataReady){ if(!guiDataReady){
@ -932,7 +940,17 @@ void UDPStandardImplementation::copyFrameToGui(char* startbuf[], uint32_t fnum,
int UDPStandardImplementation::createUDPSockets(){ int UDPStandardImplementation::createUDPSockets(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
int port[2];
port[0] = server_port[0];
port[1] = server_port[1];
/** eiger specific */
/*
if(bottom){
port[0] = server_port[1];
port[1] = server_port[0];
}
*/
//if eth is mistaken with ip address //if eth is mistaken with ip address
if (strchr(eth,'.')!=NULL) if (strchr(eth,'.')!=NULL)
strcpy(eth,""); strcpy(eth,"");
@ -944,23 +962,25 @@ int UDPStandardImplementation::createUDPSockets(){
cout<<"warning:eth is empty.listening to all"<<endl; cout<<"warning:eth is empty.listening to all"<<endl;
for(int i=0;i<numListeningThreads;i++) for(int i=0;i<numListeningThreads;i++)
udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize); udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize);
} }
//normal socket //normal socket
else{ else{
cout<<"eth:"<<eth<<endl; cout<<"eth:"<<eth<<endl;
for(int i=0;i<numListeningThreads;i++) for(int i=0;i<numListeningThreads;i++)
udpSocket[i] = new genericSocket(server_port[i],genericSocket::UDP,bufferSize,eth); udpSocket[i] = new genericSocket(port[i],genericSocket::UDP,bufferSize,eth);
} }
//error //error
int iret; int iret;
for(int i=0;i<numListeningThreads;i++){ for(int i=0;i<numListeningThreads;i++){
iret = udpSocket[i]->getErrorStatus(); iret = udpSocket[i]->getErrorStatus();
if(iret){ if(!iret)
cout << "UDP port opened at port " << port[i] << endl;
else{
#ifdef VERBOSE #ifdef VERBOSE
cout << "Could not create UDP socket on port " << server_port[i] << " error:" << iret << endl; cout << "Could not create UDP socket on port " << port[i] << " error:" << iret << endl;
#endif #endif
return FAIL; return FAIL;
} }
@ -1418,7 +1438,7 @@ int UDPStandardImplementation::startReceiver(char message[]){
cout << endl << message << endl; cout << endl << message << endl;
return FAIL; return FAIL;
} }
cout << "UDP socket(s) created successfully. 1st port " << server_port[0] << endl; cout << "UDP socket(s) created successfully." << endl;
if(setupWriter() == FAIL){ if(setupWriter() == FAIL){
@ -1589,9 +1609,13 @@ int UDPStandardImplementation::startListening(){
//normal listening //normal listening
else if(!carryonBufferSize){ else if(!carryonBufferSize){
/* if(!ithread){*/
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize); rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize);
expected = maxBufferSize; expected = maxBufferSize;
/*}else{
while(1) usleep(100000000);
}
*/
} }
//the remaining packets from previous buffer //the remaining packets from previous buffer
else{ else{
@ -1608,24 +1632,26 @@ int UDPStandardImplementation::startListening(){
expected = maxBufferSize - carryonBufferSize; expected = maxBufferSize - carryonBufferSize;
} }
#ifdef VERYDEBUG //#ifdef VERYDEBUG
cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl; cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl;
#endif //#endif
//start indices for each start of scan/acquisition - eiger does it before //start indices for each start of scan/acquisition - eiger does it before
if((!measurementStarted) && (rc > 0) && (!ithread)) if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread); startFrameIndices(ithread);
//problem in receiving or end of acquisition //problem in receiving or end of acquisition
if((rc < expected)||(rc <= 0)){ if((rc < expected)||(rc <= 0)){
stopListening(ithread,rc,packetcount,total); stopListening(ithread,rc,packetcount,total);
continue; continue;
} }
/*
//start indices for each start of scan/acquisition - eiger does it before
if((!measurementStarted) && (rc > 0) && (!ithread))
startFrameIndices(ithread);
*/
//reset //reset
packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread; packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread;
@ -1811,7 +1837,10 @@ int loop;
//for progress //for progress
if(myDetectorType == EIGER){ if(myDetectorType == EIGER){
tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum); tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1 if(dynamicRange != 32)
tempframenum += (startFrameIndex-1); //eiger frame numbers start at 1, so need to -1
else
tempframenum = ((tempframenum / EIGER_32BIT_INITIAL_CONSTANT) + startFrameIndex)-1;//eiger 32 bit mode is a multiple of 17c. +startframeindex for scans
}else if ((myDetectorType == GOTTHARD) && (shortFrame == -1)) }else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset); tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else else
@ -1836,6 +1865,7 @@ int loop;
if (cbAction < DO_EVERYTHING){ if (cbAction < DO_EVERYTHING){
for(i=0;i<numListeningThreads;++i) for(i=0;i<numListeningThreads;++i)
/* for eiger 32 bit mode, currframenum like gotthard, does not start from 0 or 1 */
rawDataReadyCallBack(currframenum, wbuf[i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady); rawDataReadyCallBack(currframenum, wbuf[i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady);
}else if (numpackets > 0){ }else if (numpackets > 0){
for(i=0;i<numListeningThreads;++i) for(i=0;i<numListeningThreads;++i)
@ -1957,8 +1987,13 @@ void UDPStandardImplementation::startFrameIndices(int ithread){
cout << "startAcquisitionIndex:" << startAcquisitionIndex<<endl; cout << "startAcquisitionIndex:" << startAcquisitionIndex<<endl;
} }
//for scans, cuz currfraenum resets //for scans, cuz currfraenum resets
else if (myDetectorType == EIGER) else if (myDetectorType == EIGER){
startFrameIndex += currframenum; if(dynamicRange == 32)
startFrameIndex = (currframenum + 1);// to be added later for scans
else
startFrameIndex += currframenum;
}
cout << "startFrameIndex:" << startFrameIndex<<endl; cout << "startFrameIndex:" << startFrameIndex<<endl;
@ -2019,9 +2054,9 @@ int i;
#endif #endif
pthread_mutex_unlock(&(status_mutex)); pthread_mutex_unlock(&(status_mutex));
#ifdef VERYDEBUG //#ifdef VERYDEBUG
cout << ithread << ": Frames listened to " << dec << ((totalListeningFrameCount[ithread]*numListeningThreads)/packetsPerFrame) << endl; cout << ithread << ": Frames listened to " << dec << ((totalListeningFrameCount[ithread]*numListeningThreads)/packetsPerFrame) << endl;
#endif //#endif
//waiting for all listening threads to be done, to print final count of frames listened to //waiting for all listening threads to be done, to print final count of frames listened to
if(ithread == 0){ if(ithread == 0){

View File

@ -37,12 +37,14 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){
string udp_interface_type = "standard"; string udp_interface_type = "standard";
string rest_hostname = "localhost:8081"; string rest_hostname = "localhost:8081";
bool bottom = false; //TODO: properly set new parameter -> mode?
//parse command line for config //parse command line for config
static struct option long_options[] = { static struct option long_options[] = {
/* These options set a flag. */ /* These options set a flag. */
//{"verbose", no_argument, &verbose_flag, 1}, //{"verbose", no_argument, &verbose_flag, 1},
/* These options dont set a flag. /* These options dont set a flag.
We distinguish them by their indices. */ We distinguish them by their indices. */
{"mode", required_argument, 0, 'm'},
{"type", required_argument, 0, 't'}, {"type", required_argument, 0, 't'},
{"config", required_argument, 0, 'f'}, {"config", required_argument, 0, 'f'},
{"rx_tcpport", required_argument, 0, 'b'}, {"rx_tcpport", required_argument, 0, 'b'},
@ -55,13 +57,14 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){
int c; int c;
while ( c != -1 ){ while ( c != -1 ){
c = getopt_long (argc, argv, "bfhtr", long_options, &option_index); c = getopt_long (argc, argv, "mbfhtr", long_options, &option_index);
/* Detect the end of the options. */ /* Detect the end of the options. */
if (c == -1) if (c == -1)
break; break;
switch(c){ switch(c){
case 'f': case 'f':
fname = optarg; fname = optarg;
//cout << long_options[option_index].name << " " << optarg << endl; //cout << long_options[option_index].name << " " << optarg << endl;
@ -83,6 +86,7 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){
string help_message = """\nSLS Receiver Server\n\n"""; string help_message = """\nSLS Receiver Server\n\n""";
help_message += """usage: slsReceiver --config config_fname [--rx_tcpport port]\n\n"""; help_message += """usage: slsReceiver --config config_fname [--rx_tcpport port]\n\n""";
help_message += """\t--config:\t configuration filename for SLS Detector receiver\n"""; help_message += """\t--config:\t configuration filename for SLS Detector receiver\n""";
help_message += """\t--mode:\t ???\n""";
help_message += """\t--rx_tcpport:\t TCP Communication Port with the client. Default: 1954.\n\n"""; help_message += """\t--rx_tcpport:\t TCP Communication Port with the client. Default: 1954.\n\n""";
help_message += """\t--rest_hostname:\t Receiver hostname:port. It applies only to REST receivers, and indicates the hostname of the REST backend. Default: localhost:8081.\n\n"""; help_message += """\t--rest_hostname:\t Receiver hostname:port. It applies only to REST receivers, and indicates the hostname of the REST backend. Default: localhost:8081.\n\n""";

View File

@ -17,6 +17,9 @@
#include <sstream> #include <sstream>
#include <fstream> #include <fstream>
#include <stdlib.h> #include <stdlib.h>
#include <byteswap.h> //linux5
#define be64toh(x) __bswap_64 (x) //linux5
//#include <endian.h> //linux6
using namespace std; using namespace std;
@ -27,27 +30,27 @@ slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() {
closeFile(0); closeFile(0);
} }
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn, bool bot):
myDetectorType(GOTTHARD),
receiverBase(rbase),
ret(OK),
lockStatus(0),
shortFrame(-1),
packetsPerFrame(GOTTHARD_PACKETS_PER_FRAME),
dynamicrange(16),
socket(NULL),
killTCPServerThread(0),
tenGigaEnable(0), portNumber(DEFAULT_PORTNO+2),
bottom(bot){
int port_no=portNumber;
if (pn>0)
port_no = pn;
success=OK;
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn):
myDetectorType(GOTTHARD),
receiverBase(rbase),
ret(OK),
lockStatus(0),
shortFrame(-1),
packetsPerFrame(GOTTHARD_PACKETS_PER_FRAME),
dynamicrange(16),
socket(NULL),
killTCPServerThread(0),
tenGigaEnable(0), portNumber(DEFAULT_PORTNO+2){
int port_no=portNumber;
if (pn>0)
port_no = pn;
success=OK;
//create socket //create socket
if(success == OK){ if(success == OK){
socket = new MySocketTCP(port_no); socket = new MySocketTCP(port_no);
@ -615,10 +618,10 @@ int slsReceiverTCPIPInterface::setup_udp(){
ret=OK; ret=OK;
strcpy(mess,"could not set up udp connection"); strcpy(mess,"could not set up udp connection");
char retval[MAX_STR_LENGTH]=""; char retval[MAX_STR_LENGTH]="";
char args[2][MAX_STR_LENGTH]; char args[3][MAX_STR_LENGTH];
string temp; string temp;
int udpport; int udpport,udpport2;
char eth[MAX_STR_LENGTH]; char eth[MAX_STR_LENGTH];
@ -643,8 +646,9 @@ int slsReceiverTCPIPInterface::setup_udp(){
else{ else{
//set up udp port //set up udp port
sscanf(args[1],"%d",&udpport); sscanf(args[1],"%d",&udpport);
sscanf(args[2],"%d",&udpport2);
receiverBase->setUDPPortNo(udpport); receiverBase->setUDPPortNo(udpport);
receiverBase->setUDPPortNo2(udpport2);
//setup udpip //setup udpip
//get ethernet interface or IP to listen to //get ethernet interface or IP to listen to
temp = genericSocket::ipToName(args[0]); temp = genericSocket::ipToName(args[0]);
@ -1002,8 +1006,8 @@ int slsReceiverTCPIPInterface::moench_read_frame(){
else{ else{
ret = OK; ret = OK;
startIndex=receiverBase->getStartFrameIndex(); /*startIndex=receiverBase->getStartFrameIndex();*/
receiverBase->readFrame(fName,&raw,index); receiverBase->readFrame(fName,&raw,index,startIndex);
/**send garbage with -1 index to try again*/ /**send garbage with -1 index to try again*/
if (raw == NULL){ if (raw == NULL){
@ -1172,8 +1176,8 @@ int slsReceiverTCPIPInterface::gotthard_read_frame(){
cout<<"haven't caught any frame yet"<<endl; cout<<"haven't caught any frame yet"<<endl;
}else{ }else{
ret = OK; ret = OK;
startIndex=receiverBase->getStartFrameIndex(); /*startIndex=receiverBase->getStartFrameIndex();*/
receiverBase->readFrame(fName,&raw,index); receiverBase->readFrame(fName,&raw,index,startIndex);
/**send garbage with -1 index to try again*/ /**send garbage with -1 index to try again*/
if (raw == NULL){ if (raw == NULL){
@ -1304,13 +1308,9 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
char* raw = new char[frameSize]; char* raw = new char[frameSize];
char* origVal = new char[frameSize]; char* origVal = new char[frameSize];
char* retval = new char[dataSize]; char* retval = new char[dataSize];
uint32_t startIndex=0;
strcpy(mess,"Could not read frame\n"); strcpy(mess,"Could not read frame\n");
/* typedef struct{
unsigned char num1[4];
unsigned char num2[4];
} eiger_packet_header;*/
// execute action if the arguments correctly arrived // execute action if the arguments correctly arrived
#ifdef SLS_RECEIVER_UDP_FUNCTIONS #ifdef SLS_RECEIVER_UDP_FUNCTIONS
@ -1328,7 +1328,7 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
else{ else{
ret = OK; ret = OK;
/** read a frame */ /** read a frame */
receiverBase->readFrame(fName,&raw, index); receiverBase->readFrame(fName,&raw,index,startIndex);
#ifdef VERBOSE #ifdef VERBOSE
cout << "index:" << dec << index << endl; cout << "index:" << dec << index << endl;
#endif #endif
@ -1341,7 +1341,7 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
} }
/**proper frame*/ /**proper frame*/
else{ else{//cout<<"**** got proper frame ******"<<endl;
memcpy(origVal,raw,frameSize); memcpy(origVal,raw,frameSize);
raw=NULL; raw=NULL;
@ -1349,12 +1349,11 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
int c1=8;//first port int c1=8;//first port
int c2=(frameSize/2) + 8; //second port int c2=(frameSize/2) + 8; //second port
int retindex=0; int retindex=0;
int irow,ibytesperpacket,irepeat; int irow,ibytesperpacket;
int repeat=1;
int linesperpacket = (16/dynamicrange)* 1;// 16:1 line, 8:2 lines, 4:4 lines, 32: 0.5 int linesperpacket = (16/dynamicrange)* 1;// 16:1 line, 8:2 lines, 4:4 lines, 32: 0.5
int numbytesperlineperport=(EIGER_PIXELS_IN_ONE_ROW/EIGER_MAX_PORTS)*dynamicrange/8;//16:1024,8:512,4:256,32:2048 int numbytesperlineperport=(EIGER_PIXELS_IN_ONE_ROW/EIGER_MAX_PORTS)*dynamicrange/8;//16:1024,8:512,4:256,32:2048
int datapacketlength = EIGER_ONE_GIGA_ONE_DATA_SIZE; int datapacketlength = EIGER_ONE_GIGA_ONE_DATA_SIZE;
int total_num_bytes = 1040*(16*dynamicrange)*2;
if(tenGigaEnable){ if(tenGigaEnable){
linesperpacket = (16/dynamicrange)* 4;// 16:4 line, 8:8 lines, 4:16 lines, 32: 2 linesperpacket = (16/dynamicrange)* 4;// 16:4 line, 8:8 lines, 4:16 lines, 32: 2
@ -1362,51 +1361,108 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
} }
//if 1GbE, one line is split into two packets for 32 bit mode, so its special //if 1GbE, one line is split into two packets for 32 bit mode, so its special
else if(dynamicrange == 32){ else if(dynamicrange == 32){
repeat=2;
numbytesperlineperport = 1024; numbytesperlineperport = 1024;
linesperpacket = 1; //we repeat this twice anyway for 32 bit linesperpacket = 1; //we repeat this twice anyway for 32 bit
} }
if(!bottom){
for(irow=0;irow<EIGER_PIXELS_IN_ONE_COL/linesperpacket;++irow){ for(irow=0;irow<EIGER_PIXELS_IN_ONE_COL/linesperpacket;++irow){
ibytesperpacket=0; ibytesperpacket=0;
while(ibytesperpacket<datapacketlength){ while(ibytesperpacket<datapacketlength){
for(irepeat=0;irepeat<repeat;irepeat++){//only for 32 bit mode, take 2 packets from same port //first port
memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport); memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport);
retindex += numbytesperlineperport; retindex += numbytesperlineperport;
c1 += numbytesperlineperport; c1 += numbytesperlineperport;
} if(dynamicrange == 32){
for(irepeat=0;irepeat<repeat;irepeat++){//only for 32 bit mode, take 2 packets from same port c1 += 16;
memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c1 += numbytesperlineperport;
c1 += 16;
}
//second port
memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport); memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport);
retindex += numbytesperlineperport; retindex += numbytesperlineperport;
c2 += numbytesperlineperport; c2 += numbytesperlineperport;
if(dynamicrange == 32){
c2 += 16;
memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c2 += numbytesperlineperport;
c2 += 16;
}
ibytesperpacket += numbytesperlineperport;
}
if(dynamicrange != 32) {
c1 += 16;
c2 += 16;
} }
ibytesperpacket += numbytesperlineperport;
} }
c1 += 16;
c2 += 16; }
//bottom half module
else{
c1 = (frameSize/2) - numbytesperlineperport - 8 ;
c2 = total_num_bytes - numbytesperlineperport - 8;
for(irow=0;irow<EIGER_PIXELS_IN_ONE_COL/linesperpacket;++irow){
ibytesperpacket=0;
while(ibytesperpacket<datapacketlength){
if(dynamicrange == 32){
//first port first chip
c1 -= (numbytesperlineperport + 16);
memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport);
retindex += numbytesperlineperport;
//first port second chip
c1 += (numbytesperlineperport+16);
memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c1 -= (numbytesperlineperport*2+32);//1024*2+16*2
//second port first chip
c2 -= (numbytesperlineperport + 16);
memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport);
retindex += numbytesperlineperport;
//second port second chip
c2 += (numbytesperlineperport + 16);
memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c2 -= (numbytesperlineperport*2+32);
}else{
//first port
memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c1 -= numbytesperlineperport;
//second port
memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c2 -= numbytesperlineperport;
}
ibytesperpacket += numbytesperlineperport;
}
if(dynamicrange != 32) {
c1 -= 16;
c2 -= 16;
}
}
} }
//64 bit htonl cuz of endianness
int64_t temp;
for(i=0;i<(1024*(16*dynamicrange)*2)/8;i++){ for(i=0;i<(1024*(16*dynamicrange)*2)/8;i++){
(*(((uint64_t*)retval)+i)) = be64toh(((uint64_t)(*(((uint64_t*)retval)+i))));
/*
int64_t temp;
temp = ((uint64_t)(*(((uint64_t*)retval)+i))); temp = ((uint64_t)(*(((uint64_t*)retval)+i)));
temp = ((temp << 8) & 0xFF00FF00FF00FF00ULL ) | ((temp >> 8) & 0x00FF00FF00FF00FFULL ); temp = ((temp << 8) & 0xFF00FF00FF00FF00ULL ) | ((temp >> 8) & 0x00FF00FF00FF00FFULL );
temp = ((temp << 16) & 0xFFFF0000FFFF0000ULL ) | ((temp >> 16) & 0x0000FFFF0000FFFFULL ); temp = ((temp << 16) & 0xFFFF0000FFFF0000ULL ) | ((temp >> 16) & 0x0000FFFF0000FFFFULL );
temp = (temp << 32) | ((temp >> 32) & 0xFFFFFFFFULL); temp = (temp << 32) | ((temp >> 32) & 0xFFFFFFFFULL);
(*(((uint64_t*)retval)+i)) = temp; (*(((uint64_t*)retval)+i)) = temp;
*/
} }
/* arg = index-startIndex;
( (((val) >> 56) & 0x00000000000000FF) | (((val) >> 40) & 0x000000000000FF00) | \
(((val) >> 24) & 0x0000000000FF0000) | (((val) >> 8) & 0x00000000FF000000) | \
(((val) << 8) & 0x000000FF00000000) | (((val) << 24) & 0x0000FF0000000000) | \
(((val) << 40) & 0x00FF000000000000) | (((val) << 56) & 0xFF00000000000000) )
*/
/*
for(i=0;i<(1024*(16*dynamicrange)*2)/4;i++)
(*(((uint32_t*)retval)+i)) = htonl((uint32_t)(*(((uint32_t*)retval)+i)));
*/
arg = index-1;
} }
} }