eiger receiver, receiving many packets at a time, with 16,8, 4 bitmode sort of working

This commit is contained in:
Maliakal Dhanya 2014-07-02 10:51:13 +02:00
parent 5f82381b1e
commit 8369153d60
8 changed files with 285 additions and 244 deletions

View File

@ -570,10 +570,9 @@ enum communicationProtocol{
}
//listens to only 1 packet
else{
//normal
nsending=packet_size;
nsent = recvfrom(socketDescriptor,(char*)buf+total_sent,nsending, 0, (struct sockaddr *) &clientAddress, &clientAddress_length);
if(!nsent) break;
length-=nsent;
total_sent+=nsent;
}
break;

View File

@ -38,7 +38,9 @@ public:
array.resize(Capacity);
sem_init(&free_mutex,0,0);
}
virtual ~CircularFifo() {}
virtual ~CircularFifo() {
sem_destroy(&free_mutex);
}
bool push(Element*& item_);
bool pop(Element*& item_);
@ -151,111 +153,3 @@ unsigned int CircularFifo<Element>::increment(unsigned int idx_) const
}
#endif /* CIRCULARFIFO_H_ */
/*
#ifndef CIRCULARFIFO_H_
#define CIRCULARFIFO_H_
#include "sls_receiver_defs.h"
#include "/usr/include/alsa/atomic.h"
#include <vector>
using namespace std;
template<typename Element>
class CircularFifo {
public:
CircularFifo(unsigned int Size) : tail(0), head(0){
Capacity = Size + 1;
array.resize(Capacity);
}
virtual ~CircularFifo() {}
bool push(Element*& item_);
bool pop(Element*& item_);
bool wasEmpty() const;
bool wasFull() const;
bool isLockFree() const;
private:
vector <Element*> array;
unsigned int Capacity;
std::atomic<unsigned int> tail; // input index
std::atomic<unsigned int> head; // output index
unsigned int increment(unsigned int idx_) const;
};
template<typename Element>
bool CircularFifo<Element>::push(Element*& item_)
{
auto currentTail = tail.load();
auto nextTail = increment(currentTail);
if(nextTail != head.load())
{
array[currentTail] = item_;
tail.store(nextTail);
return true;
}
// queue was full
return false;
}
template<typename Element>
bool CircularFifo<Element>::pop(Element*& item_)
{
const auto currentHead = head.load();
if(currentHead == tail.load())
return false; // empty queue
item_ = array[currentHead];
head.store(increment(currentHead));
return true;
}
template<typename Element>
bool CircularFifo<Element>::wasEmpty() const
{
return (head.load() == tail.load());
}
template<typename Element>
bool CircularFifo<Element>::wasFull() const
{
const auto nextTail = increment(tail.load());
return (nextTail == head.load());
}
template<typename Element>
bool CircularFifo<Element>::isLockFree() const
{
return (tail.is_lock_free() && head.is_lock_free());
}
template<typename Element>
unsigned int CircularFifo<Element>::increment(unsigned int idx_) const
{
// increment or wrap
// =================
// index++;
// if(index == array.lenght) -> index = 0;
//
//or as written below:
// index = (index+1) % array.length
return (idx_ + 1) % Capacity;
}
#endif */

View File

@ -69,21 +69,24 @@
#define MAX_EIGER_PORTS 2
#define EIGER_MAX_PORTS 2
#define EIGER_HEADER_LENGTH 48
#define EIGER_FIFO_SIZE 2500 //cannot be less than max jobs per thread = 1000
#define EIGER_FIFO_SIZE 250 //cannot be less than max jobs per thread = 1000
/*#define EIGER_ALIGNED_FRAME_SIZE 65536*/
#define EIGER_PACKETS_PER_FRAME (256*MAX_EIGER_PORTS) //default for 16B
#define EIGER_ONE_PACKET_SIZE 1040 //default for 16B
#define EIGER_BUFFER_SIZE (EIGER_ONE_PACKET_SIZE*EIGER_PACKETS_PER_FRAME) //1040*256//default for 16B
#define EIGER_DATA_BYTES (1024*EIGER_PACKETS_PER_FRAME) //1024*256 //default for 16B
#define EIGER_PACKETS_PER_FRAME_COSTANT (16*EIGER_MAX_PORTS)//*bit mode 4*16=64, 8*16=128, 16*16=256, 32*16=512
#define EIGER_ONE_PACKET_SIZE 1040
#define EIGER_ONE_DATA_SIZE 1024
#define EIGER_BUFFER_SIZE_CONSTANT (EIGER_ONE_PACKET_SIZE*EIGER_PACKETS_PER_FRAME_COSTANT)//1040*16*2//*bit mode
#define EIGER_DATA_BYTES_CONSTANT (EIGER_ONE_DATA_SIZE*EIGER_PACKETS_PER_FRAME_COSTANT) //1024*16*2//*bit mode
#define EIGER_FRAME_INDEX_MASK 0xFFFF
#define EIGER_FRAME_INDEX_OFFSET 0
#define EIGER_PACKET_INDEX_MASK 0x0
#define EIGER_IMAGE_HEADER_SIZE 32
#define EIGER_IMAGE_HEADER_SIZE 48
#define EIGER_PIXELS_IN_ONE_ROW (256*4)

View File

@ -46,7 +46,9 @@ enum {
F_RESET_RECEIVER_FRAMES_CAUGHT, /**< resets the frames caught by receiver */
F_ENABLE_RECEIVER_FILE_WRITE, /**< sets the receiver file write */
F_ENABLE_RECEIVER_COMPRESSION, /**< enable compression in receiver */
F_ENABLE_RECEIVER_OVERWRITE /**< set overwrite flag in receiver */
F_ENABLE_RECEIVER_OVERWRITE, /**< set overwrite flag in receiver */
F_ENABLE_RECEIVER_TEN_GIGA /**< enable 10Gbe in receiver */
/* Always append functions hereafter!!! */
};

View File

@ -34,6 +34,7 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int argc, char *argv[], int
lockStatus(0),
shortFrame(-1),
packetsPerFrame(GOTTHARD_PACKETS_PER_FRAME),
dynamicrange(16),
socket(NULL),
killTCPServerThread(0){
@ -1347,37 +1348,23 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
int arg = -1,i;
uint32_t index=0;
int bufferSize = EIGER_BUFFER_SIZE;
int onebuffersize = EIGER_BUFFER_SIZE/EIGER_PACKETS_PER_FRAME;
int onedatasize = EIGER_DATA_BYTES/EIGER_PACKETS_PER_FRAME;
int numpackets = EIGER_PACKETS_PER_FRAME;
int bufferSize = EIGER_BUFFER_SIZE_CONSTANT * dynamicrange;
char* raw = new char[bufferSize];
char* origVal = new char[bufferSize];
char* retval = new char[EIGER_DATA_BYTES];
/*
//retval is a full frame
int rnel = bufferSize/(sizeof(int));
int* retval = new int[rnel];
int* origVal = new int[rnel];
//all initialized to 0
for(i=0;i<rnel;i++) retval[i]=0;
for(i=0;i<rnel;i++) origVal[i]=0;
*/
char* retval = new char[(EIGER_DATA_BYTES_CONSTANT*dynamicrange)];
strcpy(mess,"Could not read frame\n");
/*
typedef struct
{
unsigned char num1[4];
unsigned char num2[4];
} eiger_packet_header;
*/
// execute action if the arguments correctly arrived
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
//#ifdef SLS_RECEIVER_UDP_FUNCTIONS
@ -1409,15 +1396,78 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
raw=NULL;
int copyindex=8;
int retindex=0;
for(int i=0;i<numpackets;++i){
//cout<<"p"<<i<<":"<<(htonl(*(uint32_t*)((eiger_packet_header *)((uint32_t*)(origVal[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num2)&0xff)<<"\t\t";
memcpy(retval+retindex ,origVal+copyindex ,onedatasize);
copyindex += 16+onedatasize;
retindex += onedatasize;
int c1=8;
int c2=(bufferSize/2) + 8; //only 2 ports
int retindex=0;
int irow,ibytesperpacket,irepeat;
int repeat=1;
int linesperpacket = 16/dynamicrange;// 16:1 line, 8:2 lines, 4:4 lines, 32: 0.5
int numbytesperlineperport=(EIGER_PIXELS_IN_ONE_ROW/EIGER_MAX_PORTS)*dynamicrange/8;//16:1024,8:512,4:256,32:2048
if(dynamicrange == 32){
repeat=2;
numbytesperlineperport = 1024;
linesperpacket = 1; //we repeat this twice anyway
}
//cout <<"linesperpacket:" <<dec<<linesperpacket <<" numbytesperlineperport:"<<numbytesperlineperport<<endl;
//for each
for(irow=0;irow<256/linesperpacket;++irow){
//cout <<"irow:"<<irow<<endl;
ibytesperpacket=0;
while(ibytesperpacket<1024){
//cout <<"ibytesperpacket:"<<ibytesperpacket<<endl;
for(irepeat=0;irepeat<repeat;irepeat++){//only for 32 bit mode, take 2 packets from same port
//cout <<"irepeat:"<<irepeat<<" c1:"<<c1<<" retindex:"<<retindex<<endl;
memcpy(retval+retindex ,origVal+c1 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c1 += numbytesperlineperport;
}
for(irepeat=0;irepeat<repeat;irepeat++){//only for 32 bit mode, take 2 packets from same port
//cout <<"irepeat:"<<irepeat<<" c2:"<<c2<<" retindex:"<<retindex<<endl;
memcpy(retval+retindex ,origVal+c2 ,numbytesperlineperport);
retindex += numbytesperlineperport;
c2 += numbytesperlineperport;
}
ibytesperpacket += numbytesperlineperport;
}
c1 += 16;
c2 += 16;
}
/*
for(i=0;i<(packetsPerFrame/EIGER_MAX_PORTS);++i){
//cout<<i<<" p1:"<<dec<<(htonl(*(uint32_t*)((eiger_packet_header *)((uint32_t*)(origVal + (c1-8))))->num2)&0xff)<<"\t\t";
memcpy(retval+retindex ,origVal+c1 ,EIGER_ONE_DATA_SIZE);
c1 += 16+EIGER_ONE_DATA_SIZE;
retindex += EIGER_ONE_DATA_SIZE;
//cout<<i<<" p2:"<<dec<<(htonl(*(uint32_t*)((eiger_packet_header *)((uint32_t*)(origVal + (c2-8))))->num2)&0xff)<<endl;
memcpy(retval+retindex ,origVal+c2 ,EIGER_ONE_DATA_SIZE);
c2 += 16+EIGER_ONE_DATA_SIZE;
retindex += EIGER_ONE_DATA_SIZE;
}
*/
for(i=0;i<(1024*(16*dynamicrange)*2)/4;i++)
(*((uint32_t*)retval+i)) = htonl((uint32_t)(*((uint32_t*)retval+i)));
/*
for(int j=25;j<27;++j)
for(int i=1000;i<1010;i=i+2)
//cout<<"retval:"<<dec<<i<<hex<<":\t0x"<<htonl((uint32_t)(*((uint32_t*)(retval+ (EIGER_ONE_DATA_SIZE)+i))))<<endl;
cout<<"retval:"<<dec<<i<<hex<<":\t0x"<<((uint16_t)(*((uint16_t*)(retval+ j* 2048+(1024)+i))))<<endl;
*/
arg = index-1;
}
}
@ -1431,7 +1481,7 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
#endif
//#endif
if(ret==OK && socket->differentClients){
cout << "Force update" << endl;
@ -1447,7 +1497,7 @@ int slsReceiverTCPIPInterface::eiger_read_frame(){
else{
socket->SendDataOnly(fName,MAX_STR_LENGTH);
socket->SendDataOnly(&arg,sizeof(arg));
socket->SendDataOnly(retval,EIGER_DATA_BYTES);
socket->SendDataOnly(retval,(EIGER_DATA_BYTES_CONSTANT*dynamicrange));
}
delete [] retval;
@ -1799,8 +1849,24 @@ int slsReceiverTCPIPInterface::set_dynamic_range() {
sprintf(mess,"Receiver locked by %s\n", socket->lastClientIP);
ret=FAIL;
}
else
else if(myDetectorType == EIGER){
switch(dr){
case 4:
case 8:
case 16:
case 32:break;
default:
sprintf(mess,"This dynamic range does not exist for eiger: %d\n",dr);
ret=FAIL;
break;
}
}
if(ret!=FAIL){
retval=slsReceiverFunctions->setDynamicRange(dr);
dynamicrange = dr;
if(myDetectorType == EIGER)
packetsPerFrame = dr*EIGER_PACKETS_PER_FRAME_COSTANT;
}
}
#ifdef VERBOSE
if(ret!=FAIL)
@ -1883,8 +1949,52 @@ int slsReceiverTCPIPInterface::enable_overwrite() {
int slsReceiverTCPIPInterface::enable_tengiga() {
ret=OK;
int retval=-1;
int val;
strcpy(mess,"Could not enable/disable 10Gbe\n");
// receive arguments
if(socket->ReceiveDataOnly(&val,sizeof(val)) < 0 ){
strcpy(mess,"Error reading from socket\n");
ret = FAIL;
}
// execute action if the arguments correctly arrived
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (ret==OK) {
if (lockStatus==1 && socket->differentClients==1){
sprintf(mess,"Receiver locked by %s\n", socket->lastClientIP);
ret=FAIL;
}
else
;//retval=slsReceiverFunctions->enable10GbE(val);
}
#ifdef VERBOSE
if(ret!=FAIL)
cout << "10Gbe:" << val << endl;
else
cout << mess << endl;
#endif
#endif
if(ret==OK && socket->differentClients){
cout << "Force update" << endl;
ret=FORCE_UPDATE;
}
// send answer
socket->SendDataOnly(&ret,sizeof(ret));
if(ret==FAIL)
socket->SendDataOnly(mess,sizeof(mess));
socket->SendDataOnly(&retval,sizeof(retval));
//return ok/fail
return ret;
}
@ -2063,8 +2173,7 @@ int slsReceiverTCPIPInterface::send_update() {
//index
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
/*if(myDetectorType != EIGER)*/
ind=slsReceiverFunctions->getFileIndex();
ind=slsReceiverFunctions->getFileIndex();
socket->SendDataOnly(&ind,sizeof(ind));
#endif

View File

@ -193,6 +193,8 @@ private:
/** enable overwrite */
int enable_overwrite();
/** enable 10Gbe */
int enable_tengiga();
//General Functions
/** Locks Receiver */
@ -246,6 +248,9 @@ private:
/** Packets per frame */
int packetsPerFrame;
/** Dynamic Range */
int dynamicrange;
/** kill tcp server thread */
int killTCPServerThread;

View File

@ -143,7 +143,7 @@ void slsReceiverUDPFunctions::initializeMembers(){
frameIndexOffset = 0;
acquisitionPeriod = SAMPLE_TIME_IN_NS;
numberOfFrames = 0;
dynamicRange = 0;
dynamicRange = 16;
shortFrame = -1;
currframenum = 0;
prevframenum = 0;
@ -262,11 +262,11 @@ int slsReceiverUDPFunctions::setDetectorType(detectorType det){
#ifndef EIGERSLS
cout << "SLS Eiger Receiver" << endl;
fifosize = EIGER_FIFO_SIZE;
packetsPerFrame = EIGER_PACKETS_PER_FRAME;
packetsPerFrame = EIGER_PACKETS_PER_FRAME_COSTANT * dynamicRange;
onePacketSize = EIGER_ONE_PACKET_SIZE;
frameSize = EIGER_BUFFER_SIZE;
bufferSize = EIGER_ONE_PACKET_SIZE; // because if you listen to a whole frame on one port, it will be interleaved
maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * EIGER_PACKETS_PER_FRAME;
frameSize = EIGER_BUFFER_SIZE_CONSTANT * dynamicRange;
bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//for only one port
maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame;
frameIndexMask = EIGER_FRAME_INDEX_MASK;
frameIndexOffset = EIGER_FRAME_INDEX_OFFSET;
packetIndexMask = EIGER_PACKET_INDEX_MASK;
@ -522,11 +522,52 @@ int32_t slsReceiverUDPFunctions::setScanTag(int32_t stag){
}
int32_t slsReceiverUDPFunctions::setDynamicRange(int32_t dr){
cout << "Setting Dynamic Range" << endl;
int olddr = dynamicRange;
if(dr >= 0){
if(receiver != NULL)
receiver->setDynamicRange(dr);
else
else{
dynamicRange = dr;
packetsPerFrame = EIGER_PACKETS_PER_FRAME_COSTANT * dynamicRange;
frameSize = EIGER_BUFFER_SIZE_CONSTANT * dynamicRange;
bufferSize = (frameSize/EIGER_MAX_PORTS) + EIGER_HEADER_LENGTH;//for only one port
maxPacketsPerFile = EIGER_MAX_FRAMES_PER_FILE * packetsPerFrame;
if(olddr != dr){
//del
if(thread_started){
createListeningThreads(true);
createWriterThreads(true);
}
for(int i=0;i<numListeningThreads;i++){
if(mem0[i]) {free(mem0[i]); mem0[i] = NULL;}
if(fifo[i]) {delete fifo[i]; fifo[i] = NULL;}
if(fifoFree[i]) {delete fifoFree[i]; fifoFree[i] = NULL;}
buffer[i] = NULL;
}
if(latestData) {delete [] latestData; latestData = NULL;}
latestData = new char[frameSize];
numJobsPerThread = -1;
setupFifoStructure();
if(createListeningThreads() == FAIL){
cout << "ERROR: Could not create listening thread" << endl;
exit (-1);
}
if(createWriterThreads() == FAIL){
cout << "ERROR: Could not create writer threads" << endl;
exit (-1);
}
setThreadPriorities();
}
}
}
if(receiver != NULL)
@ -842,10 +883,21 @@ void slsReceiverUDPFunctions::copyFrameToGui(char* startbuf[], uint32_t fnum, ch
guiDataReady=0;
//eiger
if(startbuf != NULL){
for(int i=numListeningThreads;i<packetsPerFrame+numListeningThreads;++i){
memcpy((((char*)latestData)+((i-numListeningThreads)*bufferSize)) ,startbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS,bufferSize);
//cout<<"p"<<i<<":"<<(htonl(*(uint32_t*)((eiger_packet_header *)((uint32_t*)(startbuf[i] + HEADER_SIZE_NUM_TOT_PACKETS)))->num2)&0xff)<<"\t\t";
int offset = 0;
int size = frameSize/EIGER_MAX_PORTS;
for(int j=0;j<numListeningThreads;++j){
memcpy((((char*)latestData)+offset) ,startbuf[j] + (HEADER_SIZE_NUM_TOT_PACKETS + EIGER_HEADER_LENGTH),size);
offset += size;
}
/*
for(int j=25;j<27;++j)
for(int i=1000;i<1010;i=i+2)
//cout<<"startbuf:"<<dec<<i<<hex<<":\t0x"<<htonl((uint32_t)(*((uint32_t*)(startbuf[1] + HEADER_SIZE_NUM_TOT_PACKETS+ EIGER_HEADER_LENGTH+8+ i))))<<endl;
cout<<"startbuf:"<<dec<<i<<hex<<":\t0x"<<((uint16_t)(*((uint16_t*)(startbuf[1] + 2+ 48+ j*1040+8+ i))))<<endl;
*/
guiFrameNumber = fnum;
}else//other detectors
memcpy(latestData,buf,bufferSize);
@ -1417,7 +1469,7 @@ void slsReceiverUDPFunctions::startReadout(){
usleep(50000);
/********************************************/
usleep(1000000);
usleep(2000000);
pthread_mutex_lock(&status_mutex);
status = TRANSMITTING;
pthread_mutex_unlock(&status_mutex);
@ -1457,7 +1509,7 @@ int slsReceiverUDPFunctions::startListening(){
thread_started = 1;
int i,total;
int lastpacketoffset, expected, rc, packetcount, maxBufferSize, carryonBufferSize;
int lastpacketoffset, expected, rc, rc1,packetcount, maxBufferSize, carryonBufferSize;
uint32_t lastframeheader;// for moench to check for all the packets in last frame
char* tempchar = NULL;
int imageheader = 0;
@ -1476,10 +1528,7 @@ int slsReceiverUDPFunctions::startListening(){
if(tempchar) {delete [] tempchar;tempchar = NULL;}
if(myDetectorType == EIGER)
tempchar = new char[bufferSize];
else
tempchar = new char[onePacketSize * (packetsPerFrame - 1)]; //gotthard: 1packet size, moench:39 packet size
tempchar = new char[onePacketSize * ((packetsPerFrame/numListeningThreads) - 1)]; //gotthard: 1packet size, moench:39 packet size
while((1<<ithread)&listeningthreads_mask){
@ -1501,24 +1550,10 @@ int slsReceiverUDPFunctions::startListening(){
}
//normal listening
else if(!carryonBufferSize){
//eiger
if (imageheader){
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS);
//if it was the header
if(rc == EIGER_HEADER_LENGTH){
#ifdef VERYDEBUG
cout << "rc for header2:" << dec << rc << endl;
#endif
expected = EIGER_HEADER_LENGTH;
}else{
expected = maxBufferSize;
}
}
//not eiger
else{
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize);
expected = maxBufferSize;
}
rc = udpSocket[ithread]->ReceiveDataOnly(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS, maxBufferSize);
expected = maxBufferSize;
}
//the remaining packets from previous buffer
else{
@ -1536,10 +1571,17 @@ int slsReceiverUDPFunctions::startListening(){
}
#ifdef VERYDEBUG
cout << ithread << " *** rc:" << dec << rc << endl;
cout << ithread << " *** expected:" << dec << expected << endl;
cout << ithread << " *** rc:" << dec << rc << ". expected:" << dec << expected << endl;
#endif
/*
if(ithread){
for(int j=25;j<27;++j)
for(int i=1000;i<1010;i=i+2)
//cout<<"startbuf:"<<dec<<i<<hex<<":\t0x"<<htonl((uint32_t)(*((uint32_t*)(buffer[ithread] + HEADER_SIZE_NUM_TOT_PACKETS+ EIGER_HEADER_LENGTH+ 8+i))))<<endl;
cout<<"startbuf:"<<dec<<i<<hex<<":\t0x"<<((uint16_t)(*((uint16_t*)(buffer[ithread] + 2+ 48+ j*1040+8+i))))<<endl;
}
*/
@ -1556,7 +1598,7 @@ int slsReceiverUDPFunctions::startListening(){
//reset
packetcount = packetsPerFrame * numJobsPerThread;
packetcount = (packetsPerFrame/numListeningThreads) * numJobsPerThread;
carryonBufferSize = 0;
@ -1617,10 +1659,7 @@ int slsReceiverUDPFunctions::startListening(){
#endif
break;
default:
if(rc==EIGER_HEADER_LENGTH)
packetcount=0;
else
packetcount = 1;
break;
}
@ -1649,6 +1688,7 @@ int slsReceiverUDPFunctions::startListening(){
//make sure its not exiting thread
if(killAllListeningThreads){
cout << ithread << " good bye listening thread" << endl;
if(tempchar) {delete [] tempchar;tempchar = NULL;}
pthread_exit(NULL);
}
}
@ -1678,17 +1718,18 @@ int slsReceiverUDPFunctions::startWriting(){
int numpackets, nf;
uint32_t tempframenum;
char* wbuf[packetsPerFrame+numListeningThreads];//interleaved + 2 ports header
char *d=new char[bufferSize];
char* wbuf[numListeningThreads];//interleaved
char *d=new char[bufferSize*numListeningThreads];
int xmax=0,ymax=0;
int ret,i,j;
int w_index=0;
int packetsPerThread = packetsPerFrame/numListeningThreads;
int loop;
while(1){
nf = 0;
w_index = 0;
packetsPerThread = packetsPerFrame/numListeningThreads;
if(myDetectorType == MOENCH){
xmax = MOENCH_PIXELS_IN_ONE_ROW-1;
ymax = MOENCH_PIXELS_IN_ONE_ROW-1;
@ -1711,40 +1752,36 @@ int slsReceiverUDPFunctions::startWriting(){
#endif
//pop
for(i=0;i<numListeningThreads;++i){
fifo[i]->pop(wbuf[w_index+i]);
numpackets = (uint16_t)(*((uint16_t*)wbuf[w_index]));
fifo[i]->pop(wbuf[i]);
numpackets = (uint16_t)(*((uint16_t*)wbuf[i]));
#ifdef VERYDEBUG
if((!w_index)||(!numpackets)) cout << ithread << " numpackets:" << dec << numpackets << endl;
cout << ithread << " numpackets:" << dec << numpackets << endl;
#endif
}
#ifdef VERYDEBUG
cout << ithread << " numpackets:" << dec << numpackets << endl;
cout << ithread << " *** writer popped from fifo " << (void*) wbuf[w_index]<< endl;
cout << ithread << " *** writer popped from fifo " << (void*) wbuf[w_index+1]<< endl;
cout << ithread << " *** writer popped from fifo " << (void*) wbuf[0]<< endl;
cout << ithread << " *** writer popped from fifo " << (void*) wbuf[1]<< endl;
#endif
//last dummy packet
if(numpackets == 0xFFFF){
stopWriting(ithread,wbuf,w_index);
stopWriting(ithread,wbuf);
continue;
}
//for progress
if(myDetectorType == EIGER){
if(!numpackets)
tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[w_index] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
}else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[w_index] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
tempframenum = (((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[w_index] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
tempframenum = ((((uint32_t)(*((uint32_t*)(wbuf[ithread] + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
if(numWriterThreads == 1)
currframenum = tempframenum;
@ -1754,12 +1791,9 @@ int slsReceiverUDPFunctions::startWriting(){
currframenum = tempframenum;
pthread_mutex_unlock(&progress_mutex);
}
#ifdef VERYDEBUG
if(!numpackets) cout << endl <<ithread << " tempframenum:" << dec << tempframenum << " curframenum:" << currframenum << endl;
#endif
//#ifdef VERYDEBUG
cout << endl <<ithread << " tempframenum:" << hex << tempframenum << " curframenum:" << currframenum << endl;
//#endif
//without datacompression: write datacall back, or write data, free fifo
@ -1767,25 +1801,23 @@ int slsReceiverUDPFunctions::startWriting(){
if (cbAction < DO_EVERYTHING){
for(i=0;i<numListeningThreads;++i)
rawDataReadyCallBack(currframenum, wbuf[w_index+i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady);
rawDataReadyCallBack(currframenum, wbuf[i], numpackets * onePacketSize, sfilefd, guiData,pRawDataReady);
}else if (numpackets > 0){
for(i=0;i<numListeningThreads;++i)
writeToFile_withoutCompression(wbuf[w_index+i], numpackets,currframenum);
writeToFile_withoutCompression(wbuf[i], numpackets,currframenum);
}
if(myDetectorType == EIGER) {
//last packet
if((packetsPerThread-1)==(htonl(*(uint32_t*)((eiger_packet_header *)((uint32_t*)(wbuf[w_index] + HEADER_SIZE_NUM_TOT_PACKETS)))->num2)&0xff)){
copyFrameToGui(wbuf,currframenum);
for(j=0;j<w_index;j=j+numListeningThreads){
for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(wbuf[j+i]));
}
}
w_index = 0;
}else
w_index+=numListeningThreads;
copyFrameToGui(wbuf,currframenum);
for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(wbuf[i]));
#ifdef VERYDEBUG
cout << ithread << ":" << i+j << " fifo freed:" << (void*)wbuf[i] << endl;
#endif
}
}
else{
//copy to gui
@ -1972,20 +2004,20 @@ int i;
void slsReceiverUDPFunctions::stopWriting(int ithread, char* wbuffer[], int wIndex){
void slsReceiverUDPFunctions::stopWriting(int ithread, char* wbuffer[]){
int i,j;
#ifdef VERYDEBUG
cout << ithread << " **********************popped last dummy frame:" << (void*)wbuffer[wIndex] << endl;
#endif
//free fifo
for(i=0;i<numListeningThreads;++i)
for(j=0;j<wIndex;++j){
while(!fifoFree[i]->push(wbuffer[wIndex+j]));
for(i=0;i<numListeningThreads;++i){
while(!fifoFree[i]->push(wbuffer[i]));
#ifdef VERYDEBUG
cout << ithread << " fifo freed:" << (void*)wbuffer[wIndex+j] << endl;
cout << ithread << ":" << i<< " fifo freed:" << (void*)wbuffer[i] << endl;
#endif
}
}
//all threads need to close file, reset mask and exit loop
@ -2047,13 +2079,13 @@ void slsReceiverUDPFunctions::writeToFile_withoutCompression(char* buf,int numpa
if((enableFileWrite) && (sfilefd)){
offset = HEADER_SIZE_NUM_TOT_PACKETS;
if(myDetectorType == EIGER)
offset += EIGER_HEADER_LENGTH;
while(numpackets > 0){
//for progress and packet loss calculation(new files)
if(myDetectorType == EIGER){
if(((uint16_t)(*((uint16_t*)buf)))==0)
tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buf + HEADER_SIZE_NUM_TOT_PACKETS)))->fnum);
}else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
if(myDetectorType == EIGER);
else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf + HEADER_SIZE_NUM_TOT_PACKETS))))& (frameIndexMask)) >> frameIndexOffset);
@ -2077,7 +2109,7 @@ void slsReceiverUDPFunctions::writeToFile_withoutCompression(char* buf,int numpa
packetsToSave = maxPacketsPerFile - packetsInFile;
if(packetsToSave > numpackets)
packetsToSave = numpackets;
/**next time offset is still plus header length*/
fwrite(buf+offset, 1, packetsToSave * onePacketSize, sfilefd);
packetsInFile += packetsToSave;
packetsCaught += packetsToSave;
@ -2088,10 +2120,8 @@ void slsReceiverUDPFunctions::writeToFile_withoutCompression(char* buf,int numpa
if(packetsInFile >= maxPacketsPerFile){
//for packet loss
lastpacket = (((packetsToSave - 1) * onePacketSize) + offset);
if(myDetectorType == EIGER){
if(((uint16_t)(*((uint16_t*)buf)))==0)
tempframenum = htonl(*(unsigned int*)((eiger_image_header *)((char*)(buf + lastpacket)))->fnum);
}else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
if(myDetectorType == EIGER);
else if ((myDetectorType == GOTTHARD) && (shortFrame == -1))
tempframenum = (((((uint32_t)(*((uint32_t*)(buf + lastpacket))))+1)& (frameIndexMask)) >> frameIndexOffset);
else
tempframenum = ((((uint32_t)(*((uint32_t*)(buf + lastpacket))))& (frameIndexMask)) >> frameIndexOffset);

View File

@ -397,9 +397,8 @@ private:
* When acquisition is over, this is called
* @param ithread listening thread number
* @param wbuffer writer buffer
* @param wIndex writer Index
*/
void stopWriting(int ithread, char* wbuffer[], int wIndex);
void stopWriting(int ithread, char* wbuffer[]);
/**
@ -432,7 +431,7 @@ private:
} eiger_packet_header;
/** max number of listening threads */
const static int MAX_NUM_LISTENING_THREADS = MAX_EIGER_PORTS;
const static int MAX_NUM_LISTENING_THREADS = EIGER_MAX_PORTS;
/** max number of writer threads */
const static int MAX_NUM_WRITER_THREADS = 15;