bit field for missing packets, added commands r_padding and r_discardpolicy, hdf5 bitfield metadata left to do

This commit is contained in:
2018-07-05 13:47:20 +02:00
parent ca8cb33569
commit c366e94a96
28 changed files with 796 additions and 451 deletions

View File

@ -23,23 +23,20 @@ using namespace std;
const string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, Fifo*& f, fileFormat* ftype, bool fwenable,
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
fileFormat* ftype, bool fwenable,
bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t,
uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t,
uint8_t, uint8_t,
char*, uint32_t, void*),
void (*dataModifyReadycb)(uint64_t, uint32_t, uint32_t, uint64_t,
uint64_t, uint16_t, uint16_t, uint16_t, uint16_t,
uint32_t, uint16_t, uint8_t, uint8_t,
char*, uint32_t &, void*),
bool* fp,
void (*dataReadycb)(char*, char*, uint32_t, void*),
void (*dataModifyReadycb)(char*, char*, uint32_t &, void*),
void *pDataReadycb) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
fifo(f),
myDetectorType(dtype),
file(0),
dataStreamEnable(dsEnable),
fileFormatType(ftype),
@ -59,6 +56,7 @@ DataProcessor::DataProcessor(int ind, Fifo*& f, fileFormat* ftype, bool fwenable
numFramesCaught(0),
currentFrameIndex(0),
silentMode(false),
framePadding(fp),
rawDataReadyCallBack(dataReadycb),
rawDataModifyReadyCallBack(dataModifyReadycb),
pRawDataReady(pDataReadycb)
@ -204,16 +202,21 @@ void DataProcessor::SetFileFormat(const fileFormat f) {
int nd[MAX_DIMENSIONS];nd[0] = 0; nd[1] = 0;
uint32_t* maxf = 0;
char* fname=0; char* fpath=0; uint64_t* findex=0;
bool* owenable=0; int* dindex=0; int* nunits=0; uint64_t* nf = 0; uint32_t* dr = 0; uint32_t* port = 0;
file->GetMemberPointerValues(nd, maxf, fname, fpath, findex, owenable, dindex, nunits, nf, dr, port);
bool* owenable=0; int* dindex=0; int* nunits=0; uint64_t* nf = 0;
uint32_t* dr = 0; uint32_t* port = 0;
file->GetMemberPointerValues(nd, maxf, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, port);
//create file writer with same pointers
SetupFileWriter(fileWriteEnable, nd, maxf, fname, fpath, findex, owenable, dindex, nunits, nf, dr, port);
SetupFileWriter(fileWriteEnable, nd, maxf, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, port);
}
}
void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf, char* fname, char* fpath, uint64_t* findex,
bool* owenable, int* dindex, int* nunits, uint64_t* nf, uint32_t* dr, uint32_t* portno,
void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf,
char* fname, char* fpath, uint64_t* findex,
bool* owenable, int* dindex, int* nunits, uint64_t* nf, uint32_t* dr,
uint32_t* portno,
GeneralData* g)
{
fileWriteEnable = fwe;
@ -247,11 +250,13 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf, char* fna
}
// only the first file
int DataProcessor::CreateNewFile(bool en, uint64_t nf, uint64_t at, uint64_t st, uint64_t sp, uint64_t ap) {
int DataProcessor::CreateNewFile(bool en, uint64_t nf, uint64_t at, uint64_t st,
uint64_t sp, uint64_t ap) {
if (file == NULL)
return FAIL;
file->CloseAllFiles();
if (file->CreateMasterFile(en, generalData->imageSize, generalData->nPixelsX, generalData->nPixelsY,
if (file->CreateMasterFile(en, generalData->imageSize,
generalData->nPixelsX, generalData->nPixelsY,
at, st, sp, ap) == FAIL)
return FAIL;
if (file->CreateFile(currentFrameIndex) == FAIL)
@ -276,7 +281,8 @@ void DataProcessor::ThreadExecution() {
char* buffer=0;
fifo->PopAddress(buffer);
#ifdef FIFODEBUG
if (!index) cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
if (!index) cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n",
index,(void*)(buffer),buffer);
#endif
//check dummy
@ -318,13 +324,14 @@ void DataProcessor::StopProcessing(char* buf) {
#endif
}
/** buf includes only the standard header */
void DataProcessor::ProcessAnImage(char* buf) {
sls_detector_header* header = (sls_detector_header*) (buf + FIFO_HEADER_NUMBYTES);
uint64_t fnum = header->frameNumber;
sls_receiver_header* rheader = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
sls_detector_header header = rheader->detHeader;
uint64_t fnum = header.frameNumber;
currentFrameIndex = fnum;
uint32_t nump = header->packetNumber;
uint32_t nump = header.packetNumber;
if (nump == generalData->packetsPerFrame) {
numFramesCaught++;
numTotalFramesCaught++;
@ -354,35 +361,34 @@ void DataProcessor::ProcessAnImage(char* buf) {
}
if (*gapPixelsEnable && (*dynamicRange!=4))
InsertGapPixels(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_detector_header), *dynamicRange);
InsertGapPixels(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
*dynamicRange);
// x coord is 0 for detector in pos [0,0,0]
if (xcoordin1D) {
// do nothing as detector has correctly send them
if (header->xCoord || header->yCoord || header->zCoord)
if (header.xCoord || header.yCoord || header.zCoord)
;
// detector has send all 0's when there should have been a value greater than 0 in some dimension
else
header->xCoord = xcoordin1D;
header.xCoord = xcoordin1D;
}
// frame padding
//std::string mystring = rheader->packetsMask.to_string<char,std::string::traits_type,std::string::allocator_type>();
// cprintf(BLUE, "%d: fnum:%llu mystring: %s nump:%u missing:%u \n",index, (long long unsigned int) fnum, mystring.c_str(), nump, generalData->packetsPerFrame-nump);
//if(!index)cprintf(BLUE, "%d: fnum:%llu nump:%u missing:%u \n",index, (long long unsigned int) fnum, nump, generalData->packetsPerFrame-nump);
if (*framePadding && nump < generalData->packetsPerFrame)
PadMissingPackets(buf);
if (rawDataReadyCallBack) {
rawDataReadyCallBack(
header->frameNumber,
header->expLength,
header->packetNumber,
header->bunchId,
header->timestamp,
header->modId,
header->xCoord,
header->yCoord,
header->zCoord,
header->debug,
header->roundRNumber,
header->detType,
header->version,
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_detector_header),
(char*)rheader,
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t*)buf)),
pRawDataReady);
}
@ -390,20 +396,8 @@ void DataProcessor::ProcessAnImage(char* buf) {
else if (rawDataModifyReadyCallBack) {cprintf(BG_GREEN,"Calling rawdatamodify\n");
uint32_t revsize = (uint32_t)(*((uint32_t*)buf));
rawDataModifyReadyCallBack(
header->frameNumber,
header->expLength,
header->packetNumber,
header->bunchId,
header->timestamp,
header->modId,
header->xCoord,
header->yCoord,
header->zCoord,
header->debug,
header->roundRNumber,
header->detType,
header->version,
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_detector_header),
(char*)rheader,
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
revsize,
pRawDataReady);
(*((uint32_t*)buf)) = revsize;
@ -411,7 +405,9 @@ void DataProcessor::ProcessAnImage(char* buf) {
if (file)
file->WriteToFile(buf + FIFO_HEADER_NUMBYTES, sizeof(sls_detector_header) + (uint32_t)(*((uint32_t*)buf)), fnum-firstMeasurementIndex, nump);
file->WriteToFile(buf + FIFO_HEADER_NUMBYTES,
sizeof(sls_receiver_header) + (uint32_t)(*((uint32_t*)buf)),
fnum-firstMeasurementIndex, nump);
@ -436,10 +432,13 @@ bool DataProcessor::CheckTimer() {
struct timespec end;
clock_gettime(CLOCK_REALTIME, &end);
#ifdef VERBOSE
cprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index, ( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0);
cprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index,
( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec )
/ 1000000000.0);
#endif
//still less than streaming timer, keep waiting
if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0) < ((double)*streamingTimerInMs/1000.00))
if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec )
/ 1000000000.0) < ((double)*streamingTimerInMs/1000.00))
return false;
//restart timer
@ -470,6 +469,54 @@ void DataProcessor::SetSilentMode(bool mode) {
silentMode = mode;
}
void DataProcessor::PadMissingPackets(char* buf) {
FILE_LOG(logDEBUG) << index << ": Padding Missing Packets";
uint32_t pperFrame = generalData->packetsPerFrame;
sls_receiver_header* header = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
uint32_t nmissing = pperFrame - header->detHeader.packetNumber;
bitset<512> pmask = header->packetsMask;
uint32_t dsize = generalData->dataSize;
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
uint32_t corrected_dsize = dsize - ((pperFrame * dsize) - generalData->imageSize);
for (int pnum = 0; pnum < pperFrame; ++pnum) {
// not missing packet
if (pmask[pnum])
continue;
// done with padding, exit loop earlier
if (!nmissing)
break;
FILE_LOG(logDEBUG) << "padding for " << index << " for pnum: " << pnum << endl;
// missing packet
switch(myDetectorType) {
//for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes data
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data + 640*2 bytes data !!
case GOTTHARD:
if(!pnum)
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize-2);
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize+2);
break;
case JUNGFRAUCTB:
if (pnum == (pperFrame-1))
memset(buf + fifohsize + (pnum * dsize), 0xFF, corrected_dsize);
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize);
break;
default:
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize);
break;
}
--nmissing;
}
}
/** eiger specific */
void DataProcessor::InsertGapPixels(char* buf, uint32_t dr) {

View File

@ -180,7 +180,7 @@ void DataStreamer::StopProcessing(char* buf) {
if (!index)
cprintf(RED,"DataStreamer %d: Dummy\n", index);
#endif
sls_detector_header* header = (sls_detector_header*) (buf);
sls_receiver_header* header = (sls_receiver_header*) (buf);
//send dummy header and data
if (!SendHeader(header, 0, 0, 0, true))
cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index);
@ -195,8 +195,8 @@ void DataStreamer::StopProcessing(char* buf) {
/** buf includes only the standard header */
void DataStreamer::ProcessAnImage(char* buf) {
sls_detector_header* header = (sls_detector_header*) (buf + FIFO_HEADER_NUMBYTES);
uint64_t fnum = header->frameNumber;
sls_receiver_header* header = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
uint64_t fnum = header->detHeader.frameNumber;
#ifdef VERBOSE
cprintf(MAGENTA,"DataStreamer %d: fnum:%lu\n", index,fnum);
#endif
@ -215,7 +215,7 @@ void DataStreamer::ProcessAnImage(char* buf) {
cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n",
(long long int) fnum, index);
memcpy(completeBuffer + ((generalData->imageSize)**shortFrameEnable), buf + FIFO_HEADER_NUMBYTES + sizeof(sls_detector_header), (uint32_t)(*((uint32_t*)buf)) ); // new size possibly from callback
memcpy(completeBuffer + ((generalData->imageSize)**shortFrameEnable), buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header), (uint32_t)(*((uint32_t*)buf)) ); // new size possibly from callback
if (!zmqSocket->SendData(completeBuffer, generalData->imageSizeComplete))
cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n",
(long long int) fnum, index);
@ -229,7 +229,7 @@ void DataStreamer::ProcessAnImage(char* buf) {
cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n",
(long long int) fnum, index);
if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_detector_header), (uint32_t)(*((uint32_t*)buf)) )) // new size possibly from callback
if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header), (uint32_t)(*((uint32_t*)buf)) )) // new size possibly from callback
cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n",
(long long int) fnum, index);
}
@ -237,21 +237,23 @@ void DataStreamer::ProcessAnImage(char* buf) {
int DataStreamer::SendHeader(sls_detector_header* header, uint32_t size, uint32_t nx, uint32_t ny, bool dummy) {
int DataStreamer::SendHeader(sls_receiver_header* rheader, uint32_t size, uint32_t nx, uint32_t ny, bool dummy) {
if (dummy)
return zmqSocket->SendHeaderData(index, dummy,SLS_DETECTOR_JSON_HEADER_VERSION);
uint64_t frameIndex = header->frameNumber - firstMeasurementIndex;
uint64_t acquisitionIndex = header->frameNumber - firstAcquisitionIndex;
sls_detector_header header = rheader->detHeader;
uint64_t frameIndex = header.frameNumber - firstMeasurementIndex;
uint64_t acquisitionIndex = header.frameNumber - firstAcquisitionIndex;
return zmqSocket->SendHeaderData(index, dummy, SLS_DETECTOR_JSON_HEADER_VERSION, *dynamicRange, *fileIndex,
nx, ny, size,
acquisitionIndex, frameIndex, fileNametoStream,
header->frameNumber, header->expLength, header->packetNumber, header->bunchId, header->timestamp,
header->modId, header->xCoord, header->yCoord, header->zCoord,
header->debug, header->roundRNumber,
header->detType, header->version,
header.frameNumber, header.expLength, header.packetNumber, header.bunchId, header.timestamp,
header.modId, header.xCoord, header.yCoord, header.zCoord,
header.debug, header.roundRNumber,
header.detType, header.version,
flippedData,
additionJsonHeader
);

View File

@ -147,12 +147,12 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
numFramesInFile++;
numActualPacketsInFile += nump;
pthread_mutex_lock(&Mutex);
if (HDF5FileStatic::WriteDataFile(index, buffer + sizeof(sls_detector_header),
if (HDF5FileStatic::WriteDataFile(index, buffer + sizeof(sls_receiver_header),
// infinite then no need for %maxframesperfile
((*maxFramesPerFile == 0) ? fnum : fnum%(*maxFramesPerFile)),
nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
dataspace, dataset, datatype) == OK) {
sls_detector_header* header = (sls_detector_header*) (buffer);
sls_receiver_header* header = (sls_receiver_header*) (buffer);
/*header->xCoord = ((*detIndex) * (*numUnitsPerDetector) + index); */
if (HDF5FileStatic::WriteParameterDatasets(index, dataspace_para,
// infinite then no need for %maxframesperfile

View File

@ -21,7 +21,8 @@ const string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as, uint32_t* fpf) :
uint32_t* us, uint32_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
@ -36,6 +37,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
udpSocketBufferSize(us),
actualUDPSocketBufferSize(as),
framesPerFile(fpf),
frameDiscardMode(fdp),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
@ -300,7 +302,7 @@ void Listener::ThreadExecution() {
//error check, (should not be here) if not transmitting yet (previous if) rc should be > 0
if (rc <= 0) {
if (rc == 0) {
//cprintf(RED,"%d Socket shut down while waiting for future packet. udpsocketalive:%d\n",index, udpSocketAlive );
if (!udpSocketAlive) {
(*((uint32_t*)buffer)) = 0;
@ -310,6 +312,14 @@ void Listener::ThreadExecution() {
return;
}
// discarding image
else if (rc < 0) {
FILE_LOG(logDEBUG) << index << " discarding fnum:" << currentFrameIndex;
fifo->FreeAddress(buffer);
currentFrameIndex++;
return;
}
(*((uint32_t*)buffer)) = rc;
(*((uint64_t*)(buffer + FIFO_HEADER_NUMBYTES ))) = currentFrameIndex; //for those returning earlier
currentFrameIndex++;
@ -355,15 +365,16 @@ uint32_t Listener::ListenToAnImage(char* buf) {
uint32_t pperFrame = generalData->packetsPerFrame;
bool isHeaderEmpty = true;
sls_detector_header* old_header = 0;
sls_detector_header* new_header = 0;
sls_receiver_header* new_header = 0;
bool standardheader = generalData->standardheader;
uint32_t corrected_dsize = dsize - ((pperFrame * dsize) - generalData->imageSize);
//reset to -1
memset(buf, 0, fifohsize);
memset(buf + fifohsize, 0xFF, generalData->imageSize);
new_header = (sls_detector_header*) (buf + FIFO_HEADER_NUMBYTES);
/*memset(buf + fifohsize, 0xFF, generalData->imageSize);*/
new_header = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
//look for carry over
@ -387,7 +398,17 @@ uint32_t Listener::ListenToAnImage(char* buf) {
cprintf(RED,"Error:(Weird), With carry flag: Frame number %lu less than current frame number %lu\n", fnum, currentFrameIndex);
return 0;
}
new_header->packetNumber = numpackets;
switch(*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets)
return -1;
break;
case DISCARD_PARTIAL_FRAMES:
return -1;
default:
break;
}
new_header->detHeader.packetNumber = numpackets;
return generalData->imageSize;
}
@ -413,7 +434,8 @@ uint32_t Listener::ListenToAnImage(char* buf) {
}
carryOverFlag = false;
numpackets++; //number of packets in this image (each time its copied to buf)
++numpackets; //number of packets in this image (each time its copied to buf)
new_header->packetsMask[pnum] = 1;
//writer header
if(isHeaderEmpty) {
@ -423,11 +445,9 @@ uint32_t Listener::ListenToAnImage(char* buf) {
}
// -------------------old header ------------------------------------------------------------------------------
else {
memset(new_header, 0, sizeof(sls_detector_header));
new_header->frameNumber = fnum;
new_header->packetNumber = pperFrame;
new_header->detType = (uint8_t) generalData->myDetectorType;
new_header->version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
new_header->detHeader.frameNumber = fnum;
new_header->detHeader.detType = (uint8_t) generalData->myDetectorType;
new_header->detHeader.version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
}
//------------------------------------------------------------------------------------------------------------
isHeaderEmpty = false;
@ -445,10 +465,21 @@ uint32_t Listener::ListenToAnImage(char* buf) {
if (udpSocketAlive){
rc = udpSocket->ReceiveDataOnly(listeningPacket);
}
// end of acquisition
if(rc <= 0) {
if (numpackets == 0) return 0; //empty image
new_header->packetNumber = numpackets; //number of packets caught
switch(*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets)
return -1;
break;
case DISCARD_PARTIAL_FRAMES:
return -1;
default:
break;
}
new_header->detHeader.packetNumber = numpackets; //number of packets caught
return generalData->imageSize; //empty packet now, but not empty image
}
@ -493,7 +524,17 @@ uint32_t Listener::ListenToAnImage(char* buf) {
carryOverFlag = true;
memcpy(carryOverPacket,listeningPacket, generalData->packetSize);
new_header->packetNumber = numpackets; //number of packets caught
switch(*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets)
return -1;
break;
case DISCARD_PARTIAL_FRAMES:
return -1;
default:
break;
}
new_header->detHeader.packetNumber = numpackets; //number of packets caught
return generalData->imageSize;
}
@ -517,7 +558,9 @@ uint32_t Listener::ListenToAnImage(char* buf) {
memcpy(buf + fifohsize + (pnum * dsize), listeningPacket + hsize, dsize);
break;
}
numpackets++; //number of packets in this image (each time its copied to buf)
++numpackets; //number of packets in this image (each time its copied to buf)
new_header->packetsMask[pnum] = 1;
if(isHeaderEmpty) {
// -------------------------- new header ----------------------------------------------------------------------
if (standardheader) {
@ -525,18 +568,17 @@ uint32_t Listener::ListenToAnImage(char* buf) {
}
// -------------------old header ------------------------------------------------------------------------------
else {
memset(new_header, 0, sizeof(sls_detector_header));
new_header->frameNumber = fnum;
new_header->packetNumber = pperFrame;
new_header->detType = (uint8_t) generalData->myDetectorType;
new_header->version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
new_header->detHeader.frameNumber = fnum;
new_header->detHeader.detType = (uint8_t) generalData->myDetectorType;
new_header->detHeader.version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
}
//------------------------------------------------------------------------------------------------------------
isHeaderEmpty = false;
}
}
new_header->packetNumber = numpackets; //number of packets caught
// complete image
new_header->detHeader.packetNumber = numpackets; //number of packets caught
return generalData->imageSize;
}

View File

@ -60,6 +60,8 @@ void UDPBaseImplementation::initializeMembers(){
//***receiver parameters***
status = IDLE;
activated = true;
frameDiscardMode = NO_DISCARD;
framePadding = false;
//***connection parameters***
strcpy(eth,"");
@ -180,6 +182,16 @@ uint32_t UDPBaseImplementation::getFramesPerFile() const{
return framesPerFile;
}
slsReceiverDefs::frameDiscardPolicy UDPBaseImplementation::getFrameDiscardPolicy() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return frameDiscardMode;
}
bool UDPBaseImplementation::getFramePaddingEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return framePadding;
}
int UDPBaseImplementation::getScanTag() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return scanTag;
@ -451,6 +463,22 @@ void UDPBaseImplementation::setFramesPerFile(const uint32_t i){
FILE_LOG(logINFO) << "Frames per file: " << framesPerFile;
}
void UDPBaseImplementation::setFrameDiscardPolicy(const frameDiscardPolicy i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if (i >= 0 && i < NUM_DISCARD_POLICIES)
frameDiscardMode = i;
FILE_LOG(logINFO) << "Frame Discard Policy: " << getFrameDiscardPolicyType(frameDiscardMode);
}
void UDPBaseImplementation::setFramePaddingEnable(const bool i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
framePadding = i;
FILE_LOG(logINFO) << "Frame Padding: " << framePadding;
}
//FIXME: needed?
void UDPBaseImplementation::setScanTag(const int i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
@ -761,17 +789,13 @@ void UDPBaseImplementation::registerCallBackAcquisitionFinished(void (*func)(uin
pAcquisitionFinished=arg;
}
void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(uint64_t,
uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t,
uint16_t, uint32_t, uint16_t, uint8_t, uint8_t,
void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(char* ,
char*, uint32_t, void*),void *arg){
rawDataReadyCallBack=func;
pRawDataReady=arg;
}
void UDPBaseImplementation::registerCallBackRawDataModifyReady(void (*func)(uint64_t,
uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t,
uint16_t, uint32_t, uint16_t, uint8_t, uint8_t,
void UDPBaseImplementation::registerCallBackRawDataModifyReady(void (*func)(char* ,
char*, uint32_t&, void*),void *arg){
rawDataModifyReadyCallBack=func;
pRawDataReady=arg;

View File

@ -374,12 +374,14 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
try {
Listener* l = new Listener(i, myDetectorType, fifo[i], &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile);
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode);
listener.push_back(l);
DataProcessor* p = new DataProcessor(i, fifo[i], &fileFormatType,
DataProcessor* p = new DataProcessor(i, myDetectorType, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
&framePadding,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
dataProcessor.push_back(p);
}

View File

@ -151,9 +151,7 @@ void slsReceiver::registerCallBackAcquisitionFinished(void (*func)(uint64_t, voi
}
void slsReceiver::registerCallBackRawDataReady(void (*func)(uint64_t, uint32_t,
uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t,
uint32_t, uint16_t, uint8_t, uint8_t,
void slsReceiver::registerCallBackRawDataReady(void (*func)(char*,
char*, uint32_t, void*),void *arg){
//tcpipInterface
if(udp_interface)
@ -163,9 +161,7 @@ void slsReceiver::registerCallBackRawDataReady(void (*func)(uint64_t, uint32_t,
}
void slsReceiver::registerCallBackRawDataModifyReady(void (*func)(uint64_t, uint32_t,
uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t,
uint32_t, uint16_t, uint8_t, uint8_t,
void slsReceiver::registerCallBackRawDataModifyReady(void (*func)(char*,
char*, uint32_t &, void*),void *arg){
//tcpipInterface
if(udp_interface)

View File

@ -171,17 +171,13 @@ void slsReceiverTCPIPInterface::registerCallBackAcquisitionFinished(void (*func)
pAcquisitionFinished=arg;
}
void slsReceiverTCPIPInterface::registerCallBackRawDataReady(void (*func)(uint64_t,
uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t,
uint16_t, uint32_t, uint16_t, uint8_t, uint8_t,
void slsReceiverTCPIPInterface::registerCallBackRawDataReady(void (*func)(char* ,
char*, uint32_t, void*),void *arg){
rawDataReadyCallBack=func;
pRawDataReady=arg;
}
void slsReceiverTCPIPInterface::registerCallBackRawDataModifyReady(void (*func)(uint64_t,
uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t,
uint16_t, uint32_t, uint16_t, uint8_t, uint8_t,
void slsReceiverTCPIPInterface::registerCallBackRawDataModifyReady(void (*func)(char* ,
char*, uint32_t &,void*),void *arg){
rawDataModifyReadyCallBack=func;
pRawDataReady=arg;
@ -299,6 +295,8 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE: return "F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE";
case F_SET_RECEIVER_FRAMES_PER_FILE:return "F_SET_RECEIVER_FRAMES_PER_FILE";
case F_RECEIVER_CHECK_VERSION: return "F_RECEIVER_CHECK_VERSION";
case F_RECEIVER_DISCARD_POLICY: return "F_RECEIVER_DISCARD_POLICY";
case F_RECEIVER_PADDING_ENABLE: return "F_RECEIVER_PADDING_ENABLE";
default: return "Unknown Function";
}
@ -353,6 +351,8 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_RECEIVER_REAL_UDP_SOCK_BUF_SIZE]= &slsReceiverTCPIPInterface::get_real_udp_socket_buffer_size;
flist[F_SET_RECEIVER_FRAMES_PER_FILE] = &slsReceiverTCPIPInterface::set_frames_per_file;
flist[F_RECEIVER_CHECK_VERSION] = &slsReceiverTCPIPInterface::check_version_compatibility;
flist[F_RECEIVER_DISCARD_POLICY] = &slsReceiverTCPIPInterface::set_discard_policy;
flist[F_RECEIVER_PADDING_ENABLE] = &slsReceiverTCPIPInterface::set_padding_enable;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
@ -692,6 +692,24 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
//frames per file
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getFramesPerFile();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
//frame discard policy
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getFrameDiscardPolicy();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
//frame padding
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getFramePaddingEnable();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// file write enable
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getFileWriteEnable();
@ -704,6 +722,12 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// gap pixels
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getGapPixelsEnable();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// receiver read frequency
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getFrameToGuiFrequency();
@ -716,12 +740,6 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// data streaming enable
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getDataStreamEnable();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// streaming source ip
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
path = receiverBase->getStreamingSourceIP();
@ -738,11 +756,11 @@ int slsReceiverTCPIPInterface::send_update() {
if (path != NULL)
delete[] path;
// gap pixels enable
// data streaming enable
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind = (int)receiverBase->getGapPixelsEnable();
ind=(int)receiverBase->getDataStreamEnable();
#endif
mySock->SendDataOnly(&ind,sizeof(ind));
n += mySock->SendDataOnly(&ind,sizeof(ind));
if (!lockStatus)
strcpy(mySock->lastClientIP,mySock->thisClientIP);
@ -2820,3 +2838,114 @@ int slsReceiverTCPIPInterface::check_version_compatibility() {
return ret;
}
int slsReceiverTCPIPInterface::set_discard_policy() {
ret = OK;
memset(mess, 0, sizeof(mess));
int index = -1;
int retval = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&index,sizeof(index)) < 0 )
return printSocketReadError();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if(index >= 0) {
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setFrameDiscardPolicy((frameDiscardPolicy)index);
}
}
//get
retval=receiverBase->getFrameDiscardPolicy();
if(index >= 0 && retval != index) {
ret = FAIL;
strcpy(mess, "Could not set frame discard policy\n");
FILE_LOG(logERROR) << mess;
}
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "frame discard policy:" << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}
int slsReceiverTCPIPInterface::set_padding_enable() {
ret = OK;
memset(mess, 0, sizeof(mess));
int index = -1;
int retval = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&index,sizeof(index)) < 0 )
return printSocketReadError();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if(index >= 0) {
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
index = (index == 0) ? 0 : 1;
receiverBase->setFramePaddingEnable(index);
}
}
//get
retval=(int)receiverBase->getFramePaddingEnable();
if(index >= 0 && retval != index) {
ret = FAIL;
strcpy(mess, "Could not set frame padding enable\n");
FILE_LOG(logERROR) << mess;
}
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "Frame Padding Enable:" << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}

View File

@ -29,14 +29,12 @@ void slsReceiverUsers::registerCallBackAcquisitionFinished(void (*func)(uint64_t
receiver->registerCallBackAcquisitionFinished(func,arg);
}
void slsReceiverUsers::registerCallBackRawDataReady(void (*func)(uint64_t frameNumber, uint32_t expLength, uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp,
uint16_t modId, uint16_t xCoord, uint16_t yCoord, uint16_t zCoord, uint32_t debug, uint16_t roundRNumber, uint8_t detType, uint8_t version,
void slsReceiverUsers::registerCallBackRawDataReady(void (*func)(char* header,
char* datapointer, uint32_t datasize, void*), void *arg){
receiver->registerCallBackRawDataReady(func,arg);
}
void slsReceiverUsers::registerCallBackRawDataModifyReady(void (*func)(uint64_t frameNumber, uint32_t expLength, uint32_t packetNumber, uint64_t bunchId, uint64_t timestamp,
uint16_t modId, uint16_t xCoord, uint16_t yCoord, uint16_t zCoord, uint32_t debug, uint16_t roundRNumber, uint8_t detType, uint8_t version,
void slsReceiverUsers::registerCallBackRawDataModifyReady(void (*func)(char* header,
char* datapointer, uint32_t& revDatasize, void*), void *arg){
receiver->registerCallBackRawDataModifyReady(func,arg);
}