added multiple listenign threads to incorporate multiple eiger ports, refactored the code a little bit

This commit is contained in:
Maliakal Dhanya 2014-06-16 16:53:36 +02:00
parent c922005f07
commit 3a71049f1c
5 changed files with 528 additions and 313 deletions

View File

@ -72,7 +72,6 @@ class sockaddr_in;
using namespace std;
#define DEFAULT_PACKET_SIZE 1286
#define DEFAULT_PACKETS_PER_FRAME 2
#define SOCKET_BUFFER_SIZE (100*1024*1024) //100MB
#define DEFAULT_PORTNO 1952
#define DEFAULT_BACKLOG 5
@ -91,8 +90,7 @@ enum communicationProtocol{
UDP /**< UDP */
};
genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, int t = DEFAULT_PACKETS_PER_FRAME) :
genericSocket(const char* const host_ip_or_name, unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE) :
// portno(port_number),
protocol(p),
is_a_server(0),
@ -101,8 +99,7 @@ enum communicationProtocol{
packet_size(ps),
nsending(0),
nsent(0),
total_sent(0),
packets_per_frame(t)// sender (client): where to? ip
total_sent(0)// sender (client): where to? ip
{
// strcpy(hostname,host_ip_or_name);
@ -149,7 +146,7 @@ enum communicationProtocol{
*/
genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, int t = DEFAULT_PACKETS_PER_FRAME, const char *eth=NULL):
genericSocket(unsigned short int const port_number, communicationProtocol p, int ps = DEFAULT_PACKET_SIZE, const char *eth=NULL):
//portno(port_number),
protocol(p),
is_a_server(1),
@ -158,8 +155,7 @@ enum communicationProtocol{
packet_size(ps),
nsending(0),
nsent(0),
total_sent(0),
packets_per_frame(t)
total_sent(0)
{
/* // you can specify an IP address: */
@ -562,25 +558,23 @@ enum communicationProtocol{
break;
case UDP:
if (socketDescriptor<0) return -1;
//if length given
//if length given, listens to length, else listens for packetsize till length is reached
if(length){
while(length>0){
nsending=packet_size;
nsending = (length>packet_size) ? packet_size:length;
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(!nsent) break;
length-=nsent;
total_sent+=nsent;
}
}
//depends on packets per frame
//listens to only 1 packet
else{
for(int i=0;i<packets_per_frame;i++){
nsending=packet_size;
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(!nsent) break;
length-=nsent;
total_sent+=nsent;
}
nsending=packet_size;
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(!nsent) break;
length-=nsent;
total_sent+=nsent;
}
break;
default:
@ -677,7 +671,6 @@ enum communicationProtocol{
int nsending;
int nsent;
int total_sent;
int packets_per_frame;

View File

@ -22,6 +22,7 @@
//all max frames defined in sls_receiver_defs.h. 20000 gotthard, 100000 for short gotthard, 1000 for moench, eiger 20000
#define GOTTHARD_FIFO_SIZE 25000 //cannot be less than max jobs per thread = 1000
/*#define GOTTHARD_ALIGNED_FRAME_SIZE 4096*/
#define GOTTHARD_PACKETS_PER_FRAME 2
@ -38,6 +39,7 @@
#define GOTTHARD_SHORT_PACKETS_PER_FRAME 1
#define GOTTHARD_SHORT_ONE_PACKET_SIZE 518
#define GOTTHARD_SHORT_BUFFER_SIZE 518
#define GOTTHARD_SHORT_DATABYTES 512
#define GOTTHARD_SHORT_FRAME_INDEX_MASK 0xFFFFFFFF
@ -67,17 +69,21 @@
#define MAX_EIGER_PORTS 2
#define EIGER_HEADER_LENGTH 48
#define EIGER_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
/*#define EIGER_ALIGNED_FRAME_SIZE 65536*/
#define EIGER_PACKETS_PER_FRAME 1 //default for 16B
#define EIGER_PACKETS_PER_FRAME (256*MAX_EIGER_PORTS) //default for 16B
#define EIGER_ONE_PACKET_SIZE 1040 //default for 16B
#define EIGER_BUFFER_SIZE (EIGER_ONE_PACKET_SIZE*EIGER_PACKETS_PER_FRAME) //1040*1 //default for 16B
#define EIGER_DATA_BYTES (1032*EIGER_PACKETS_PER_FRAME) //1280*40 //default for 16B
#define EIGER_BUFFER_SIZE (EIGER_ONE_PACKET_SIZE*EIGER_PACKETS_PER_FRAME) //1040*256//default for 16B
#define EIGER_DATA_BYTES (1024*EIGER_PACKETS_PER_FRAME) //1024*256 //default for 16B
#define EIGER_FRAME_INDEX_MASK 0xFFFFFF00
#define EIGER_FRAME_INDEX_OFFSET 8
#define EIGER_PACKET_INDEX_MASK 0xFF
#define EIGER_FRAME_INDEX_MASK 0xFFFF
#define EIGER_FRAME_INDEX_OFFSET 0
#define EIGER_PACKET_INDEX_MASK 0x0
#define EIGER_IMAGE_HEADER_SIZE 32

View File

@ -192,7 +192,7 @@ void slsReceiverTCPIPInterface::stop(){
cout << "Shutting down UDP Socket" << endl;
if(slsReceiverFunctions)
slsReceiverFunctions->shutDownUDPSocket();
slsReceiverFunctions->shutDownUDPSockets();
cout << "Closing Files... " << endl;
slsReceiverFunctions->closeFile();
@ -250,7 +250,7 @@ void slsReceiverTCPIPInterface::startTCPServer(){
if(v==GOODBYE){
cout << "Shutting down UDP Socket" << endl;
if(slsReceiverFunctions)
slsReceiverFunctions->shutDownUDPSocket();
slsReceiverFunctions->shutDownUDPSockets();
cout << "Closing Files... " << endl;
slsReceiverFunctions->closeFile();

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
/*#ifdef SLS_RECEIVER_UDP_FUNCTIONS*/
#ifndef SLS_RECEIVER_UDP_FUNCTIONS_H
#define SLS_RECEIVER_UDP_FUNCTIONS_H
/********************************************//**
@ -267,10 +267,10 @@ public:
void startReadout();
/**
* shuts down the udp socket
* shuts down the udp sockets
* \returns if success or fail
*/
int shutDownUDPSocket();
int shutDownUDPSockets();
private:
/**
@ -295,10 +295,10 @@ private:
void copyFrameToGui(char* startbuf);
/**
* creates udp socket
* creates udp sockets
* \returns if success or fail
*/
int createUDPSocket();
int createUDPSockets();
/**
* create listening thread
@ -366,17 +366,55 @@ private:
*/
int startWriting();
/**
* Writing to file without compression
* @param buf is the address of buffer popped out of fifo
* @param numpackets is the number of packets
* @param framenum current frame number
*/
void writeToFile_withoutCompression(char* buf,int numpackets);
void writeToFile_withoutCompression(char* buf,int numpackets, uint32_t framenum);
/**
* Its called for the first packet of a scan or acquistion
* Sets the startframeindices and the variables to know if acquisition started
* @param ithread listening thread number
*/
void startFrameIndices(int ithread);
/**
* This is called when udp socket is shut down
* It pops ffff instead of packet number into fifo
* to inform writers about the end of listening session
* @param ithread listening thread number
* @param rc number of bytes received
* @param pc packet count
* @param t total packets listened to
*/
void stopListening(int ithread, int rc, int &pc, int &t);
/** structure of an eiger image header*/
typedef struct
{
unsigned char header_before[20];
unsigned char fnum[4];
unsigned char header_after[24];
} eiger_image_header;
/** structure of an eiger image header*/
typedef struct
{
unsigned char num1[4];
unsigned char num2[4];
} eiger_packet_header;
/** max number of listening threads */
const static int MAX_NUM_LISTENING_THREADS = MAX_EIGER_PORTS;
/** max number of writer threads */
const static int MAX_NUM_WRITER_THREADS = 15;
/** Eiger Receiver */
EigerReceiver *receiver;
@ -390,10 +428,10 @@ private:
runStatus status;
/** UDP Socket between Receiver and Detector */
genericSocket* udpSocket;
genericSocket* udpSocket[MAX_NUM_LISTENING_THREADS];
/** Server UDP Port*/
int server_port;
int server_port[MAX_NUM_LISTENING_THREADS];
/** ethernet interface or IP to listen to */
char *eth;
@ -482,7 +520,10 @@ private:
/** Previous Frame number from buffer */
uint32_t prevframenum;
/** buffer size can be 1286*2 or 518 or 1286*40 */
/** size of one frame */
int frameSize;
/** buffer size. different from framesize as we wait for one packet instead of frame for eiger */
int bufferSize;
/** oen buffer size */
@ -509,23 +550,23 @@ private:
/** number of jobs per thread for data compression */
int numJobsPerThread;
/** memory allocated for the buffer */
char *mem0;
/** datacompression - save only hits */
bool dataCompression;
/** memory allocated for the buffer */
char *mem0[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data read */
CircularFifo<char>* fifo;
CircularFifo<char>* fifo[MAX_NUM_LISTENING_THREADS];
/** circular fifo to store addresses of data already written and ready to be resued*/
CircularFifo<char>* fifoFree;
CircularFifo<char>* fifoFree[MAX_NUM_LISTENING_THREADS];
/** Receiver buffer */
char *buffer;
char *buffer[MAX_NUM_LISTENING_THREADS];
/** max number of writer threads */
const static int MAX_NUM_WRITER_THREADS = 15;
/** number of writer threads */
int numListeningThreads;
/** number of writer threads */
int numWriterThreads;
@ -533,19 +574,25 @@ private:
/** to know if listening and writer threads created properly */
int thread_started;
/** current listening thread index*/
int currentListeningThreadIndex;
/** current writer thread index*/
int currentWriterThreadIndex;
/** thread listening to packets */
pthread_t listening_thread;
pthread_t listening_thread[MAX_NUM_LISTENING_THREADS];
/** thread writing packets */
pthread_t writing_thread[MAX_NUM_WRITER_THREADS];
/** total frame count the listening thread has listened to */
int totalListeningFrameCount;
int totalListeningFrameCount[MAX_NUM_LISTENING_THREADS];
/** mask showing which threads are running */
/** mask showing which listening threads are running */
volatile uint32_t listeningthreads_mask;
/** mask showing which writer threads are running */
volatile uint32_t writerthreads_mask;
/** mask showing which threads have created files*/
@ -554,11 +601,8 @@ private:
/** OK if file created was successful */
int ret_createfile;
/** 0 if listening thread is idle, 1 otherwise */
int listening_thread_running;
/** variable used to self terminate threads waiting for semaphores */
int killListeningThread;
int killAllListeningThreads;
/** variable used to self terminate threads waiting for semaphores */
int killAllWritingThreads;
@ -569,8 +613,8 @@ private:
//semaphores
/** semaphore to synchronize writer and guireader threads */
sem_t smp;
/** semaphore to synchronize listener thread */
sem_t listensmp;
/** semaphore to synchronize listener threads */
sem_t listensmp[MAX_NUM_LISTENING_THREADS];
/** semaphore to synchronize writer threads */
sem_t writersmp[MAX_NUM_WRITER_THREADS];
@ -687,4 +731,4 @@ public:
#endif
#endif
/*#endif*/