after merging with developer

This commit is contained in:
2018-11-02 10:48:06 +01:00
134 changed files with 4245 additions and 4058 deletions

View File

@ -9,7 +9,6 @@
#include "Fifo.h"
#include <iostream>
using namespace std;
FILE* BinaryFile::masterfd = 0;
@ -53,7 +52,7 @@ int BinaryFile::CreateFile(uint64_t fnum) {
if (BinaryFileStatic::CreateDataFile(filefd, *overWriteEnable, currentFileName, FILE_BUFFER_SIZE) == FAIL)
return FAIL;
if(!silentMode) {
if(!(*silentMode)) {
FILE_LOG(logINFO) << "[" << *udpPortNumber << "]: Binary File created: " << currentFileName;
}
return OK;
@ -125,7 +124,7 @@ int BinaryFile::CreateMasterFile(bool en, uint32_t size,
if (master && (*detIndex==0)) {
masterFileName = BinaryFileStatic::CreateMasterFileName(filePath,
fileNamePrefix, *fileIndex);
if(!silentMode) {
if(!(*silentMode)) {
FILE_LOG(logINFO) << "Master File: " << masterFileName;
}
return BinaryFileStatic::CreateMasterDataFile(masterfd, masterFileName,

View File

@ -18,16 +18,15 @@
#include <iostream>
#include <errno.h>
#include <cstring>
using namespace std;
const string DataProcessor::TypeName = "DataProcessor";
const std::string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
fileFormat* ftype, bool fwenable,
bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
bool* fp,
bool* fp, bool* act, bool* depaden, bool* sm,
void (*dataReadycb)(char*, char*, uint32_t, void*),
void (*dataModifyReadycb)(char*, char*, uint32_t &, void*),
void *pDataReadycb) :
@ -47,7 +46,10 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
streamingTimerInMs(timer),
currentFreqCount(0),
tempBuffer(0),
xcoordin1D(0),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
framePadding(fp),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
@ -55,8 +57,6 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
numTotalFramesCaught(0),
numFramesCaught(0),
currentFrameIndex(0),
silentMode(false),
framePadding(fp),
rawDataReadyCallBack(dataReadycb),
rawDataModifyReadyCallBack(dataModifyReadycb),
pRawDataReady(pDataReadycb)
@ -77,7 +77,7 @@ DataProcessor::~DataProcessor() {
}
/** getters */
string DataProcessor::GetType(){
std::string DataProcessor::GetType(){
return TypeName;
}
@ -222,8 +222,6 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf,
fileWriteEnable = fwe;
if (g)
generalData = g;
// fix xcoord as detector is not providing it right now
xcoordin1D = ((*dindex) * (*nunits)) + index;
if (file) {
@ -237,13 +235,13 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf,
file = new HDF5File(index, maxf,
nd, fname, fpath, findex, owenable,
dindex, nunits, nf, dr, portno,
generalData->nPixelsX, generalData->nPixelsY, &silentMode);
generalData->nPixelsX, generalData->nPixelsY, silentMode);
break;
#endif
default:
file = new BinaryFile(index, maxf,
nd, fname, fpath, findex, owenable,
dindex, nunits, nf, dr, portno, &silentMode);
dindex, nunits, nf, dr, portno, silentMode);
break;
}
}
@ -364,18 +362,11 @@ void DataProcessor::ProcessAnImage(char* buf) {
InsertGapPixels(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
*dynamicRange);
// x coord is 0 for detector in pos [0,0,0]
if (xcoordin1D) {
// do nothing as detector has correctly send them
if (header.xCoord || header.yCoord || header.zCoord)
;
// detector has send all 0's when there should have been a value greater than 0 in some dimension
else
header.xCoord = xcoordin1D;
}
// frame padding
if (*framePadding && nump < generalData->packetsPerFrame)
// deactivated and padding enabled
if ((!(*activated) && *deactivatedPaddingEnable) ||
// frame padding
(*framePadding && nump < generalData->packetsPerFrame))
PadMissingPackets(buf);
// normal call back
@ -398,10 +389,11 @@ void DataProcessor::ProcessAnImage(char* buf) {
(*((uint32_t*)buf)) = revsize;
}
// write to file
if (file)
file->WriteToFile(buf + FIFO_HEADER_NUMBYTES,
sizeof(sls_receiver_header) + (uint32_t)(*((uint32_t*)buf)),
sizeof(sls_receiver_header) + (uint32_t)(*((uint32_t*)buf)), //+ size of data (resizable from previous call back
fnum-firstMeasurementIndex, nump);
@ -460,9 +452,6 @@ void DataProcessor::SetPixelDimension() {
}
}
void DataProcessor::SetSilentMode(bool mode) {
silentMode = mode;
}
void DataProcessor::PadMissingPackets(char* buf) {
FILE_LOG(logDEBUG) << index << ": Padding Missing Packets";
@ -488,7 +477,7 @@ void DataProcessor::PadMissingPackets(char* buf) {
if (!nmissing)
break;
FILE_LOG(logDEBUG) << "padding for " << index << " for pnum: " << pnum << endl;
FILE_LOG(logDEBUG) << "padding for " << index << " for pnum: " << pnum << std::endl;
// missing packet
switch(myDetectorType) {

View File

@ -11,28 +11,29 @@
#include <iostream>
#include <errno.h>
using namespace std;
const string DataStreamer::TypeName = "DataStreamer";
const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd, char* ajh) :
DataStreamer::DataStreamer(int ind, Fifo*& f, uint32_t* dr, std::vector<ROI>* r,
uint64_t* fi, int* fd, char* ajh, bool* sm) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
fifo(f),
zmqSocket(0),
dynamicRange(dr),
shortFrameEnable(sEnable),
roi(r),
adcConfigured(-1),
fileIndex(fi),
flippedData(fd),
additionJsonHeader(ajh),
silentMode(sm),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
firstMeasurementIndex(0),
completeBuffer(0),
flippedData(fd),
additionJsonHeader(ajh),
silentMode(false)
completeBuffer(0)
{
if(ThreadObject::CreateThread() == FAIL)
throw std::exception();
@ -50,7 +51,7 @@ DataStreamer::~DataStreamer() {
}
/** getters */
string DataStreamer::GetType(){
std::string DataStreamer::GetType(){
return TypeName;
}
@ -87,7 +88,10 @@ void DataStreamer::ResetParametersforNewMeasurement(char* fname){
delete [] completeBuffer;
completeBuffer = 0;
}
if (*shortFrameEnable >= 0) {
if (roi->size()) {
if (generalData->myDetectorType == GOTTHARD) {
adcConfigured = generalData->GetAdcConfigured(index, roi);
}
completeBuffer = new char[generalData->imageSizeComplete];
memset(completeBuffer, 0, generalData->imageSizeComplete);
}
@ -132,7 +136,7 @@ void DataStreamer::CreateZmqSockets(int* nunits, uint32_t port, const char* srci
uint32_t portnum = port + index;
try {
zmqSocket = new ZmqSocket(portnum, (strlen(srcip)?srcip:NULL));
zmqSocket = new ZmqSocket(portnum, (strlen(srcip)?srcip:NULL));
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index);
throw;
@ -211,11 +215,19 @@ void DataStreamer::ProcessAnImage(char* buf) {
//shortframe gotthard
if (completeBuffer) {
if (!SendHeader(header, (uint32_t)(*((uint32_t*)buf)), generalData->nPixelsXComplete, generalData->nPixelsYComplete, false))
//disregarding the size modified from callback (always using imageSizeComplete
// instead of buf (32 bit) because gui needs imagesizecomplete and listener
//write imagesize
if (!SendHeader(header, generalData->imageSizeComplete,
generalData->nPixelsXComplete, generalData->nPixelsYComplete, false))
cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n",
(long long int) fnum, index);
memcpy(completeBuffer + ((generalData->imageSize)**shortFrameEnable), buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header), (uint32_t)(*((uint32_t*)buf)) ); // new size possibly from callback
memcpy(completeBuffer + ((generalData->imageSize) * adcConfigured),
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t*)buf)) );
if (!zmqSocket->SendData(completeBuffer, generalData->imageSizeComplete))
cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n",
(long long int) fnum, index);
@ -225,11 +237,13 @@ void DataStreamer::ProcessAnImage(char* buf) {
//normal
else {
if (!SendHeader(header, (uint32_t)(*((uint32_t*)buf)), generalData->nPixelsX, generalData->nPixelsY, false)) // new size possibly from callback
if (!SendHeader(header, (uint32_t)(*((uint32_t*)buf)),
generalData->nPixelsX, generalData->nPixelsY, false)) // new size possibly from callback
cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n",
(long long int) fnum, index);
if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header), (uint32_t)(*((uint32_t*)buf)) )) // new size possibly from callback
if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t*)buf)) )) // new size possibly from callback
cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n",
(long long int) fnum, index);
}
@ -251,7 +265,7 @@ int DataStreamer::SendHeader(sls_receiver_header* rheader, uint32_t size, uint32
nx, ny, size,
acquisitionIndex, frameIndex, fileNametoStream,
header.frameNumber, header.expLength, header.packetNumber, header.bunchId, header.timestamp,
header.modId, header.xCoord, header.yCoord, header.zCoord,
header.modId, header.row, header.column, header.reserved,
header.debug, header.roundRNumber,
header.detType, header.version,
flippedData,
@ -272,7 +286,3 @@ int DataStreamer::RestreamStop() {
}
void DataStreamer::SetSilentMode(bool mode) {
silentMode = mode;
}

View File

@ -10,7 +10,6 @@
#include <iostream>
#include <cstdlib>
#include <cstring>
using namespace std;
Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t depth):

View File

@ -7,7 +7,6 @@
#include "File.h"
#include <iostream>
using namespace std;
File::File(int ind, uint32_t* maxf,
@ -35,29 +34,29 @@ File::File(int ind, uint32_t* maxf,
File::~File() {}
string File::GetCurrentFileName() {
std::string File::GetCurrentFileName() {
return currentFileName;
}
void File::PrintMembers() {
FILE_LOG(logINFO) << "\nGeneral Writer Variables:" << endl
<< "Index: " << index << endl
<< "Max Frames Per File: " << *maxFramesPerFile << endl
<< "Number of Detectors in x dir: " << numDetX << endl
<< "Number of Detectors in y dir: " << numDetY << endl
<< "File Name Prefix: " << fileNamePrefix << endl
<< "File Path: " << filePath << endl
<< "File Index: " << *fileIndex << endl
<< "Over Write Enable: " << *overWriteEnable << endl
FILE_LOG(logINFO) << "\nGeneral Writer Variables:" << std::endl
<< "Index: " << index << std::endl
<< "Max Frames Per File: " << *maxFramesPerFile << std::endl
<< "Number of Detectors in x dir: " << numDetX << std::endl
<< "Number of Detectors in y dir: " << numDetY << std::endl
<< "File Name Prefix: " << fileNamePrefix << std::endl
<< "File Path: " << filePath << std::endl
<< "File Index: " << *fileIndex << std::endl
<< "Over Write Enable: " << *overWriteEnable << std::endl
<< "Detector Index: " << *detIndex << endl
<< "Number of Units Per Detector: " << *numUnitsPerDetector << endl
<< "Number of Images in Acquisition: " << *numImages << endl
<< "Dynamic Range: " << *dynamicRange << endl
<< "UDP Port number: " << *udpPortNumber << endl
<< "Master File Name: " << masterFileName << endl
<< "Current File Name: " << currentFileName << endl
<< "Silent Mode: " << silentMode;
<< "Detector Index: " << *detIndex << std::endl
<< "Number of Units Per Detector: " << *numUnitsPerDetector << std::endl
<< "Number of Images in Acquisition: " << *numImages << std::endl
<< "Dynamic Range: " << *dynamicRange << std::endl
<< "UDP Port number: " << *udpPortNumber << std::endl
<< "Master File Name: " << masterFileName << std::endl
<< "Current File Name: " << currentFileName << std::endl
<< "Silent Mode: " << *silentMode;
}

View File

@ -11,8 +11,6 @@
#include <iomanip>
#include <libgen.h> //basename
#include <string.h>
using namespace std;
pthread_mutex_t HDF5File::Mutex = PTHREAD_MUTEX_INITIALIZER;
@ -65,13 +63,13 @@ HDF5File::HDF5File(int ind, uint32_t* maxf,
parameterNames.push_back("mod id");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("x Coord");
parameterNames.push_back("row");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("y Coord");
parameterNames.push_back("column");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("z Coord");
parameterNames.push_back("reserved");
parameterDataTypes.push_back(PredType::STD_U16LE);
parameterNames.push_back("debug");
@ -159,7 +157,7 @@ int HDF5File::CreateFile(uint64_t fnum) {
if (dataspace == NULL)
cprintf(RED,"Got nothing!\n");
if(!silentMode) {
if(!(*silentMode)) {
FILE_LOG(logINFO) << *udpPortNumber << ": HDF5 File created: " << currentFileName;
}
return OK;
@ -170,6 +168,13 @@ void HDF5File::CloseCurrentFile() {
pthread_mutex_lock(&Mutex);
HDF5FileStatic::CloseDataFile(index, filefd);
pthread_mutex_unlock(&Mutex);
for (unsigned int i = 0; i < dataset_para.size(); ++i)
delete dataset_para[i];
dataset_para.clear();
if(dataspace_para) {delete dataspace_para;dataspace_para=0;}
if(dataset) {delete dataset;dataset=0;}
if(dataspace) {delete dataspace;dataspace=0;}
if(filefd) {delete filefd;filefd=0;}
}
@ -182,10 +187,19 @@ void HDF5File::CloseAllFiles() {
HDF5FileStatic::CloseVirtualDataFile(virtualfd);
}
pthread_mutex_unlock(&Mutex);
for (unsigned int i = 0; i < dataset_para.size(); ++i)
delete dataset_para[i];
dataset_para.clear();
if(dataspace_para) delete dataspace_para;
if(dataset) delete dataset;
if(dataspace) delete dataspace;
if(filefd) delete filefd;
}
int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t nump) {
// check if maxframesperfile = 0 for infinite
if ((*maxFramesPerFile) && (numFramesInFile >= (*maxFramesPerFile))) {
CloseCurrentFile();
@ -199,7 +213,7 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
if (fnum >= extNumImages) {
if (HDF5FileStatic::ExtendDataset(index, dataspace, dataset,
dataspace_para, dataset_para, *numImages) == OK) {
if (!silentMode) {
if (!(*silentMode)) {
cprintf(BLUE,"%d Extending HDF5 dataset by %llu, Total x Dimension: %llu\n",
index, (long long unsigned int)extNumImages,
(long long unsigned int)(extNumImages + *numImages));
@ -213,12 +227,12 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
((*maxFramesPerFile == 0) ? fnum : fnum%(*maxFramesPerFile)),
nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
dataspace, dataset, datatype) == OK) {
sls_receiver_header* header = (sls_receiver_header*) (buffer);
/*header->xCoord = ((*detIndex) * (*numUnitsPerDetector) + index); */
if (HDF5FileStatic::WriteParameterDatasets(index, dataspace_para,
// infinite then no need for %maxframesperfile
((*maxFramesPerFile == 0) ? fnum : fnum%(*maxFramesPerFile)),
dataset_para, header, parameterDataTypes) == OK) {
dataset_para, (sls_receiver_header*) (buffer),
parameterDataTypes) == OK) {
pthread_mutex_unlock(&Mutex);
return OK;
}
@ -242,7 +256,7 @@ int HDF5File::CreateMasterFile(bool en, uint32_t size,
virtualfd = 0;
masterFileName = HDF5FileStatic::CreateMasterFileName(filePath,
fileNamePrefix, *fileIndex);
if(!silentMode) {
if(!(*silentMode)) {
FILE_LOG(logINFO) << "Master File: " << masterFileName;
}
pthread_mutex_lock(&Mutex);
@ -262,43 +276,57 @@ void HDF5File::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
//not created before
if (!virtualfd && anyPacketsCaught) {
//only one file and one sub image (link current file in master)
if (((numFilesinAcquisition == 1) && (numDetY*numDetX) == 1)) {
//dataset name
ostringstream osfn;
osfn << "/data";
if ((*numImages > 1)) osfn << "_f" << setfill('0') << setw(12) << 0;
string dsetname = osfn.str();
pthread_mutex_lock(&Mutex);
HDF5FileStatic::LinkVirtualInMaster(masterFileName, currentFileName,
dsetname, parameterNames);
pthread_mutex_unlock(&Mutex);
}
// called only by the one maser receiver
if (master && (*detIndex==0)) {
//create virutal file
else
CreateVirtualFile(numf);
//only one file and one sub image (link current file in master)
if (((numFilesinAcquisition == 1) && (numDetY*numDetX) == 1)) {
LinkVirtualFileinMasterFile();
}
//create virutal file
else{
CreateVirtualFile(numf);}
}
}
numFilesinAcquisition = 0;
}
// called only by the one maser receiver
int HDF5File::CreateVirtualFile(uint64_t numf) {
if (master && (*detIndex==0)) {
pthread_mutex_lock(&Mutex);
int ret = HDF5FileStatic::CreateVirtualDataFile(
virtualfd, masterFileName,
filePath, fileNamePrefix, *fileIndex, (*numImages > 1),
*detIndex, *numUnitsPerDetector,
// infinite images in 1 file, then maxfrperfile = numf
((*maxFramesPerFile == 0) ? numf+1 : *maxFramesPerFile),
numf+1,
"data", datatype,
numDetY, numDetX, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
HDF5_WRITER_VERSION,
parameterNames, parameterDataTypes);
pthread_mutex_unlock(&Mutex);
return ret;
pthread_mutex_lock(&Mutex);
std::string vname = HDF5FileStatic::CreateVirtualFileName(filePath, fileNamePrefix, *fileIndex);
if(!(*silentMode)) {
FILE_LOG(logINFO) << "Virtual File: " << vname;
}
return OK;
int ret = HDF5FileStatic::CreateVirtualDataFile(vname,
virtualfd, masterFileName,
filePath, fileNamePrefix, *fileIndex, (*numImages > 1),
*detIndex, *numUnitsPerDetector,
// infinite images in 1 file, then maxfrperfile = numf
((*maxFramesPerFile == 0) ? numf+1 : *maxFramesPerFile),
numf+1,
"data", datatype,
numDetY, numDetX, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
HDF5_WRITER_VERSION,
parameterNames, parameterDataTypes);
pthread_mutex_unlock(&Mutex);
return ret;
}
// called only by the one maser receiver
int HDF5File::LinkVirtualFileinMasterFile() {
//dataset name
std::ostringstream osfn;
osfn << "/data";
if ((*numImages > 1)) osfn << "_f" << std::setfill('0') << std::setw(12) << 0;
std::string dsetname = osfn.str();
pthread_mutex_lock(&Mutex);
int ret = HDF5FileStatic::LinkVirtualInMaster(masterFileName, currentFileName,
dsetname, parameterNames);
pthread_mutex_unlock(&Mutex);
return ret;
}

View File

@ -14,15 +14,14 @@
#include <iostream>
#include <errno.h>
#include <cstring>
using namespace std;
const string Listener::TypeName = "Listener";
const std::string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp) :
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
@ -38,6 +37,11 @@ Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
actualUDPSocketBufferSize(as),
framesPerFile(fpf),
frameDiscardMode(fdp),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
row(0),
column(0),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
@ -50,8 +54,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
listeningPacket(0),
udpSocketAlive(0),
numPacketsStatistic(0),
numFramesStatistic(0),
silentMode(false)
numFramesStatistic(0)
{
if(ThreadObject::CreateThread() == FAIL)
throw std::exception();
@ -70,7 +73,7 @@ Listener::~Listener() {
}
/** getters */
string Listener::GetType(){
std::string Listener::GetType(){
return TypeName;
}
@ -155,7 +158,7 @@ void Listener::RecordFirstIndices(uint64_t fnum) {
firstAcquisitionIndex = fnum;
}
if(!silentMode) {
if(!(*silentMode)) {
if (!index) cprintf(BLUE,"%d First Acquisition Index:%lu\n"
"%d First Measurement Index:%lu\n",
index, firstAcquisitionIndex,
@ -183,6 +186,10 @@ int Listener::SetThreadPriority(int priority) {
int Listener::CreateUDPSockets() {
if (!(*activated)) {
return OK;
}
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
memset(eth, 0, MAX_STR_LENGTH);
@ -193,16 +200,16 @@ int Listener::CreateUDPSockets() {
ShutDownUDPSocket();
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
*udpSocketBufferSize);
int iret = udpSocket->getErrorStatus();
if(!iret){
try{
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
*udpSocketBufferSize);
FILE_LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber;
}else{
FILE_LOG(logERROR) << "Could not create UDP socket on port " << *udpPortNumber << " error: " << iret;
} catch (...) {
FILE_LOG(logERROR) << "Could not create UDP socket on port " << *udpPortNumber;
return FAIL;
}
udpSocketAlive = true;
sem_init(&semaphore_socket,1,0);
@ -229,13 +236,14 @@ void Listener::ShutDownUDPSocket() {
}
void Listener::SetSilentMode(bool mode) {
silentMode = mode;
}
int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
FILE_LOG(logINFO) << "Testing UDP Socket Buffer size with test port " << *udpPortNumber;
if (!(*activated)) {
*actualUDPSocketBufferSize = (s*2);
return OK;
}
uint32_t temp = *udpSocketBufferSize;
*udpSocketBufferSize = s;
@ -248,17 +256,20 @@ int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
if(udpSocket){
udpSocket->ShutDownSocket();
delete udpSocket;
udpSocket = 0;
}
//create dummy socket
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
try {
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
*udpSocketBufferSize);
int iret = udpSocket->getErrorStatus();
if (iret){
FILE_LOG(logERROR) << "Could not create a test UDP socket on port " << *udpPortNumber << " error: " << iret;
} catch (...) {
FILE_LOG(logERROR) << "Could not create a test UDP socket on port " << *udpPortNumber;
return FAIL;
}
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize();
if (*actualUDPSocketBufferSize != (s*2)) {
@ -277,6 +288,10 @@ int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
return OK;
}
void Listener::SetHardCodedPosition(uint16_t r, uint16_t c) {
row = r;
column = c;
}
void Listener::ThreadExecution() {
char* buffer;
@ -288,7 +303,7 @@ void Listener::ThreadExecution() {
#endif
//udpsocket doesnt exist
if (!udpSocketAlive && !carryOverFlag) {
if (*activated && !udpSocketAlive && !carryOverFlag) {
//FILE_LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not created or shut down earlier";
(*((uint32_t*)buffer)) = 0;
StopListening(buffer);
@ -296,7 +311,7 @@ void Listener::ThreadExecution() {
}
//get data
if ((*status != TRANSMITTING && udpSocketAlive) || carryOverFlag) {
if ((*status != TRANSMITTING && (!(*activated) || udpSocketAlive)) || carryOverFlag) {
rc = ListenToAnImage(buffer);
}
@ -328,7 +343,7 @@ void Listener::ThreadExecution() {
fifo->PushAddress(buffer);
//Statistics
if(!silentMode) {
if(!(*silentMode)) {
numFramesStatistic++;
if (numFramesStatistic >=
//second condition also for infinite #number of frames
@ -354,6 +369,7 @@ void Listener::StopListening(char* buf) {
/* buf includes the fifo header and packet header */
uint32_t Listener::ListenToAnImage(char* buf) {
int rc = 0;
uint64_t fnum = 0, bid = 0;
uint32_t pnum = 0, snum = 0;
@ -375,6 +391,27 @@ uint32_t Listener::ListenToAnImage(char* buf) {
/*memset(buf + fifohsize, 0xFF, generalData->imageSize);*/
new_header = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
// deactivated (eiger)
if (!(*activated)) {
// no padding
if (!(*deactivatedPaddingEnable))
return 0;
// padding without setting bitmask (all missing packets padded in dataProcessor)
if (currentFrameIndex >= *numImages)
return 0;
//(eiger) first fnum starts at 1
if (!currentFrameIndex) {
++currentFrameIndex;
}
new_header->detHeader.frameNumber = currentFrameIndex;
new_header->detHeader.row = row;
new_header->detHeader.column = column;
new_header->detHeader.detType = (uint8_t) generalData->myDetectorType;
new_header->detHeader.version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
return generalData->imageSize;
}
//look for carry over
@ -409,6 +446,10 @@ uint32_t Listener::ListenToAnImage(char* buf) {
break;
}
new_header->detHeader.packetNumber = numpackets;
if(isHeaderEmpty) {
new_header->detHeader.row = row;
new_header->detHeader.column = column;
}
return generalData->imageSize;
}
@ -446,6 +487,8 @@ uint32_t Listener::ListenToAnImage(char* buf) {
// -------------------old header ------------------------------------------------------------------------------
else {
new_header->detHeader.frameNumber = fnum;
new_header->detHeader.row = row;
new_header->detHeader.column = column;
new_header->detHeader.detType = (uint8_t) generalData->myDetectorType;
new_header->detHeader.version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
}
@ -480,6 +523,10 @@ uint32_t Listener::ListenToAnImage(char* buf) {
break;
}
new_header->detHeader.packetNumber = numpackets; //number of packets caught
if(isHeaderEmpty) {
new_header->detHeader.row = row;
new_header->detHeader.column = column;
}
return generalData->imageSize; //empty packet now, but not empty image
}
@ -535,6 +582,10 @@ uint32_t Listener::ListenToAnImage(char* buf) {
break;
}
new_header->detHeader.packetNumber = numpackets; //number of packets caught
if(isHeaderEmpty) {
new_header->detHeader.row = row;
new_header->detHeader.column = column;
}
return generalData->imageSize;
}
@ -569,6 +620,8 @@ uint32_t Listener::ListenToAnImage(char* buf) {
// -------------------old header ------------------------------------------------------------------------------
else {
new_header->detHeader.frameNumber = fnum;
new_header->detHeader.row = row;
new_header->detHeader.column = column;
new_header->detHeader.detType = (uint8_t) generalData->myDetectorType;
new_header->detHeader.version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
}

View File

@ -7,13 +7,6 @@
#include <iostream>
#include <cstdio>
using namespace std;

View File

@ -9,7 +9,6 @@
#include <iostream>
#include <syscall.h>
using namespace std;

View File

@ -5,13 +5,10 @@
***********************************************/
#include "UDPBaseImplementation.h"
#include "genericSocket.h"
#include "ZmqSocket.h"
#include <sys/stat.h> // stat
#include <iostream>
#include <string.h>
using namespace std;
@ -60,6 +57,7 @@ void UDPBaseImplementation::initializeMembers(){
//***receiver parameters***
status = IDLE;
activated = true;
deactivatedPaddingEnable = true;
frameDiscardMode = NO_DISCARD;
framePadding = false;
@ -83,7 +81,7 @@ void UDPBaseImplementation::initializeMembers(){
dataCompressionEnable = false;
//***acquisition parameters***
shortFrameEnable = -1;
roi.clear();
frameToGuiFrequency = 0;
frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER_IN_MS;
dataStreamEnable = false;
@ -92,7 +90,7 @@ void UDPBaseImplementation::initializeMembers(){
memset(additionalJsonHeader, 0, sizeof(additionalJsonHeader));
//***receiver parameters***
silentMode = 0;
silentMode = false;
}
@ -251,9 +249,9 @@ char *UDPBaseImplementation::getEthernetInterface() const{
/***acquisition parameters***/
int UDPBaseImplementation::getShortFrameEnable() const{
std::vector<slsReceiverDefs::ROI> UDPBaseImplementation::getROI() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return shortFrameEnable;
return roi;
}
uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{
@ -320,15 +318,20 @@ slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return status;}
uint32_t UDPBaseImplementation::getSilentMode() const{
bool UDPBaseImplementation::getSilentMode() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return silentMode;}
int UDPBaseImplementation::getActivate() const{
bool UDPBaseImplementation::getActivate() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return activated;
}
bool UDPBaseImplementation::getDeactivatedPadding() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return deactivatedPaddingEnable;
}
uint32_t UDPBaseImplementation::getStreamingPort() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return streamingPort;
@ -369,7 +372,7 @@ uint32_t UDPBaseImplementation::getActualUDPSocketBufferSize() const {
*************************************************************************/
/**initial parameters***/
void UDPBaseImplementation::configure(map<string, string> config_map){
void UDPBaseImplementation::configure(std::map<std::string, std::string> config_map){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
@ -537,11 +540,27 @@ void UDPBaseImplementation::setEthernetInterface(const char* c){
/***acquisition parameters***/
int UDPBaseImplementation::setShortFrameEnable(const int i){
int UDPBaseImplementation::setROI(const std::vector<slsReceiverDefs::ROI> i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
shortFrameEnable = i;
FILE_LOG(logINFO) << "Short Frame Enable: " << stringEnable(shortFrameEnable);
roi = i;
std::stringstream sstm;
sstm << "ROI: ";
if (!roi.size())
sstm << "0";
else {
for (unsigned int i = 0; i < roi.size(); ++i) {
sstm << "( " <<
roi[i].xmin << ", " <<
roi[i].xmax << ", " <<
roi[i].ymin << ", " <<
roi[i].ymax << " )";
}
}
std::string message = sstm.str();
FILE_LOG(logINFO) << message;
//overrridden child classes might return FAIL
return OK;
}
@ -660,7 +679,7 @@ int UDPBaseImplementation::setFifoDepth(const uint32_t i){
}
/***receiver parameters***/
void UDPBaseImplementation::setSilentMode(const uint32_t i){
void UDPBaseImplementation::setSilentMode(const bool i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
silentMode = i;
@ -735,17 +754,20 @@ void UDPBaseImplementation::abort(){
}
int UDPBaseImplementation::setActivate(int enable){
bool UDPBaseImplementation::setActivate(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(enable != -1){
activated = enable;
FILE_LOG(logINFO) << "Activation: " << stringEnable(activated);
}
activated = enable;
FILE_LOG(logINFO) << "Activation: " << stringEnable(activated);
return activated;
}
bool UDPBaseImplementation::setDeactivatedPadding(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
deactivatedPaddingEnable = enable;
FILE_LOG(logINFO) << "Deactivated Padding Enable: " << stringEnable(deactivatedPaddingEnable);
return deactivatedPaddingEnable;
}
void UDPBaseImplementation::setStreamingPort(const uint32_t i) {
streamingPort = i;

View File

@ -7,16 +7,14 @@
#include <iostream>
#include <string.h>
using namespace std;
#include "UDPInterface.h"
#include "UDPBaseImplementation.h"
#include "UDPStandardImplementation.h"
using namespace std;
UDPInterface * UDPInterface::create(string receiver_type){
UDPInterface * UDPInterface::create(std::string receiver_type){
if (receiver_type == "standard"){
FILE_LOG(logINFO) << "Starting " << receiver_type;

View File

@ -16,7 +16,6 @@
#include <cstring> //strcpy
#include <errno.h> //eperm
#include <fstream>
using namespace std;
/** cosntructor & destructor */
@ -33,16 +32,16 @@ UDPStandardImplementation::~UDPStandardImplementation() {
void UDPStandardImplementation::DeleteMembers() {
if (generalData) { delete generalData; generalData=0;}
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
}
@ -69,8 +68,8 @@ uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
vector<DataProcessor*>::const_iterator it;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
std::vector<DataProcessor*>::const_iterator it;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetMeasurementStartedFlag() ? 1 : 0);
sum += (*it)->GetNumTotalFramesCaught();
}
@ -85,7 +84,7 @@ uint64_t UDPStandardImplementation::getFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetNumFramesCaught();
}
@ -100,7 +99,7 @@ int64_t UDPStandardImplementation::getAcquisitionIndex() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetActualProcessedAcquisitionIndex();
}
@ -120,7 +119,7 @@ int UDPStandardImplementation::setGapPixelsEnable(const bool b) {
// side effects
generalData->SetGapPixelsEnable(b, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
@ -144,7 +143,7 @@ void UDPStandardImplementation::setFileFormat(const fileFormat f){
break;
}
//destroy file writer, set file format and create file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetFileFormat(f);
FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType);
@ -168,35 +167,64 @@ void UDPStandardImplementation::setFileWriteEnable(const bool b){
int UDPStandardImplementation::setShortFrameEnable(const int i) {
int UDPStandardImplementation::setROI(const std::vector<slsReceiverDefs::ROI> i) {
if (myDetectorType != GOTTHARD) {
cprintf(RED, "Error: Can not set short frame for this detector\n");
cprintf(RED, "Error: Can not set ROI for this detector\n");
return FAIL;
}
if (shortFrameEnable != i) {
shortFrameEnable = i;
if (generalData)
delete generalData;
if (i != -1)
generalData = new ShortGotthardData();
else
generalData = new GotthardData();
bool change = false;
if (roi.size() != i.size())
change = true;
else {
for (unsigned int iloop = 0; iloop < i.size(); ++iloop) {
if (
(roi[iloop].xmin != i[iloop].xmin) ||
(roi[iloop].xmax != i[iloop].xmax) ||
(roi[iloop].ymin != i[iloop].ymin) ||
(roi[iloop].xmax != i[iloop].xmax)) {
change = true;
break;
}
}
}
if (change) {
roi = i;
generalData->SetROI(i);
framesPerFile = generalData->maxFramesPerFile;
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetGeneralData(generalData);
}
FILE_LOG(logINFO) << "Short Frame Enable: " << shortFrameEnable;
std::stringstream sstm;
sstm << "ROI: ";
if (!roi.size())
sstm << "0";
else {
for (unsigned int i = 0; i < roi.size(); ++i) {
sstm << "( " <<
roi[i].xmin << ", " <<
roi[i].xmax << ", " <<
roi[i].ymin << ", " <<
roi[i].ymax << " )";
}
}
std::string message = sstm.str();
FILE_LOG(logINFO) << message;
return OK;
}
@ -216,7 +244,7 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
dataStreamEnable = enable;
//data sockets have to be created again as the client ones are
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
@ -224,13 +252,13 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
for ( int i = 0; i < numThreads; ++i ) {
try {
DataStreamer* s = new DataStreamer(i, fifo[i], &dynamicRange,
&shortFrameEnable, &fileIndex, flippedData, additionalJsonHeader);
&roi, &fileIndex, flippedData, additionalJsonHeader, &silentMode);
dataStreamer.push_back(s);
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP);
}
catch(...) {
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
@ -269,7 +297,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
generalData->SetDynamicRange(i,tengigaEnable);
generalData->SetGapPixelsEnable(gapPixelsEnable, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
@ -309,19 +337,6 @@ int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
}
void UDPStandardImplementation::setSilentMode(const uint32_t i){
silentMode = i;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetSilentMode(i);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetSilentMode(i);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetSilentMode(i);
FILE_LOG(logINFO) << "Silent Mode: " << i;
}
int UDPStandardImplementation::setDetectorType(const detectorType d) {
FILE_LOG(logDEBUG) << "Setting receiver type";
@ -375,22 +390,22 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
Listener* l = new Listener(i, myDetectorType, fifo[i], &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode);
&frameDiscardMode, &activated, &deactivatedPaddingEnable, &silentMode);
listener.push_back(l);
DataProcessor* p = new DataProcessor(i, myDetectorType, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
&framePadding,
&framePadding, &activated, &deactivatedPaddingEnable, &silentMode,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
dataProcessor.push_back(p);
}
catch (...) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
return FAIL;
@ -398,9 +413,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
//set up writer and callbacks
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
SetThreadPriorities();
@ -424,17 +439,24 @@ void UDPStandardImplementation::setDetectorPositionId(const int i){
&detID, &numThreads, &numberOfFrames, &dynamicRange, &udpPortNum[i],
generalData);
}
for (unsigned int i = 0; i < listener.size(); ++i) {
uint16_t row = 0, col = 0;
row = detID % numDet[1]; // row
col = (detID / numDet[1]) * ((myDetectorType == EIGER) ? 2 : 1) + i; // col for horiz. udp ports
listener[i]->SetHardCodedPosition(row, col);
}
}
void UDPStandardImplementation::resetAcquisitionCount() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
FILE_LOG(logINFO) << "Acquisition Count has been reset";
@ -445,7 +467,6 @@ void UDPStandardImplementation::resetAcquisitionCount() {
int UDPStandardImplementation::startReceiver(char *c) {
cprintf(RESET,"\n");
FILE_LOG(logINFO) << "Starting Receiver";
ResetParametersforNewMeasurement();
//listener
@ -499,10 +520,10 @@ void UDPStandardImplementation::stopReceiver(){
bool running = true;
while(running) {
running = false;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
if ((*it)->IsRunning())
running = true;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
@ -513,8 +534,8 @@ void UDPStandardImplementation::stopReceiver(){
if (fileWriteEnable && fileFormatType == HDF5) {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
}
@ -526,7 +547,7 @@ void UDPStandardImplementation::stopReceiver(){
running = true;
while(running) {
running = false;
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
@ -573,39 +594,37 @@ void UDPStandardImplementation::stopReceiver(){
void UDPStandardImplementation::startReadout(){
if(status == RUNNING){
//needs to wait for packets only if activated
if(activated){
// wait for incoming delayed packets
//current packets caught
volatile int totalP = 0,prev=-1;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
//current packets caught
volatile int totalP = 0,prev=-1;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
//wait as long as there is change from prev totalP,
while(prev != totalP){
//wait as long as there is change from prev totalP,
while(prev != totalP){
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d\n",
prev,totalP);
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d\n",
prev,totalP);
#endif
//usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);
usleep(5*1000);/* Need to find optimal time **/
//usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);
usleep(5*1000);/* Need to find optimal time **/
prev = totalP;
totalP = 0;
prev = totalP;
totalP = 0;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
#endif
}
}
}
//set status
status = TRANSMITTING;
FILE_LOG(logINFO) << "Status: Transmitting";
@ -616,7 +635,7 @@ void UDPStandardImplementation::startReadout(){
void UDPStandardImplementation::shutDownUDPSockets() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ShutDownUDPSocket();
}
@ -625,9 +644,9 @@ void UDPStandardImplementation::shutDownUDPSockets() {
void UDPStandardImplementation::closeFiles() {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->CloseFiles();
maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
}
@ -643,7 +662,7 @@ int UDPStandardImplementation::setUDPSocketBufferSize(const uint32_t s) {
int UDPStandardImplementation::restreamStop() {
bool ret = OK;
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
if ((*it)->RestreamStop() == FAIL)
ret = FAIL;
}
@ -663,14 +682,14 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
int max_back_log;
const char *proc_file_name = "/proc/sys/net/core/netdev_max_backlog";
{
ifstream proc_file(proc_file_name);
std::ifstream proc_file(proc_file_name);
proc_file >> max_back_log;
}
if (max_back_log < MAX_SOCKET_INPUT_PACKET_QUEUE) {
ofstream proc_file(proc_file_name);
std::ofstream proc_file(proc_file_name);
if (proc_file.good()) {
proc_file << MAX_SOCKET_INPUT_PACKET_QUEUE << endl;
proc_file << MAX_SOCKET_INPUT_PACKET_QUEUE << std::endl;
cprintf(GREEN, "Max length of input packet queue "
"[/proc/sys/net/core/netdev_max_backlog] modified to %d\n",
MAX_SOCKET_INPUT_PACKET_QUEUE);
@ -686,13 +705,13 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
void UDPStandardImplementation::SetThreadPriorities() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Could not prioritize listener threads. (No Root Privileges?)";
return;
}
}
ostringstream osfn;
std::ostringstream osfn;
osfn << "Priorities set - "
"Listener:" << LISTENER_PRIORITY;
@ -704,10 +723,10 @@ int UDPStandardImplementation::SetupFifoStructure() {
numberofJobs = 1;
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
for ( int i = 0; i < numThreads; i++ ) {
for ( int i = 0; i < numThreads; ++i ) {
//create fifo structure
try {
@ -717,7 +736,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
fifo.push_back(f);
} catch (...) {
cprintf(RED,"Error: Could not allocate memory for fifo structure of index %d\n", i);
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
return FAIL;
@ -736,15 +755,15 @@ int UDPStandardImplementation::SetupFifoStructure() {
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
if (dataStreamEnable) {
char fnametostream[MAX_STR_LENGTH];
snprintf(fnametostream, MAX_STR_LENGTH, "%s/%s", filePath, fileName);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement(fnametostream);
}
}
@ -788,15 +807,15 @@ int UDPStandardImplementation::SetupWriter() {
void UDPStandardImplementation::StartRunning() {
//set running mask and post semaphore to start the inner loop in execution thread
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
(*it)->StartRunning();
(*it)->Continue();
}
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}

View File

@ -16,7 +16,6 @@
#include <sys/wait.h> //wait
#include <unistd.h> //usleep
#include <syscall.h>
using namespace std;
bool keeprunning;
@ -44,18 +43,18 @@ void GetData(char* metadata, char* datapointer, uint32_t datasize, void* p){
slsReceiverDefs::sls_receiver_header* header = (slsReceiverDefs::sls_receiver_header*)metadata;
slsReceiverDefs::sls_detector_header detectorHeader = header->detHeader;
PRINT_IN_COLOR (detectorHeader.modId?detectorHeader.modId:detectorHeader.xCoord,
PRINT_IN_COLOR (detectorHeader.modId?detectorHeader.modId:detectorHeader.row,
"#### %d GetData: ####\n"
"frameNumber: %llu\t\texpLength: %u\t\tpacketNumber: %u\t\tbunchId: %llu"
"\t\ttimestamp: %llu\t\tmodId: %u\t\t"
"xCoord: %u\t\tyCoord: %u\t\tzCoord: %u\t\tdebug: %u"
"xCrow%u\t\tcolumn: %u\t\tcolumn: %u\t\tdebug: %u"
"\t\troundRNumber: %u\t\tdetType: %u\t\tversion: %u"
//"\t\tpacketsMask:%s"
"\t\tfirstbytedata: 0x%x\t\tdatsize: %u\n\n",
detectorHeader.xCoord, detectorHeader.frameNumber,
detectorHeader.row, detectorHeader.frameNumber,
detectorHeader.expLength, detectorHeader.packetNumber, detectorHeader.bunchId,
detectorHeader.timestamp, detectorHeader.modId,
detectorHeader.xCoord, detectorHeader.yCoord, detectorHeader.zCoord,
detectorHeader.row, detectorHeader.column, detectorHeader.column,
detectorHeader.debug, detectorHeader.roundRNumber,
detectorHeader.detType, detectorHeader.version,
//header->packetsMask.to_string().c_str(),

View File

@ -14,20 +14,16 @@
#include "slsReceiver.h"
#include "gitInfoReceiver.h"
using namespace std;
slsReceiver::slsReceiver(int argc, char *argv[], int &success):
tcpipInterface (NULL),
udp_interface (NULL)
{
success=OK;
slsReceiver::slsReceiver(int argc, char *argv[]):
tcpipInterface (0) {
// options
map<string, string> configuration_map;
std::map<std::string, std::string> configuration_map;
int tcpip_port_no = 1954;
string fname = "";
std::string fname = "";
int64_t tempval = 0;
//parse command line for config
@ -71,42 +67,33 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success):
case 'v':
tempval = GITREV;
tempval = (tempval <<32) | GITDATE;
cout << "SLS Receiver " << GITBRANCH << " (0x" << hex << tempval << ")" << endl;
success = FAIL; // to exit
break;
std::cout << "SLS Receiver " << GITBRANCH << " (0x" << std::hex << tempval << ")" << std::endl;
throw std::exception();
case 'h':
default:
string help_message = "\n"
+ string(argv[0]) + "\n"
+ "Usage: " + string(argv[0]) + " [arguments]\n"
std::string help_message = "\n"
+ std::string(argv[0]) + "\n"
+ "Usage: " + std::string(argv[0]) + " [arguments]\n"
+ "Possible arguments are:\n"
+ "\t-f, --config <fname> : Loads config from file\n"
+ "\t-t, --rx_tcpport <port> : TCP Communication Port with client. \n"
+ "\t Default: 1954. Required for multiple \n"
+ "\t receivers\n\n";
FILE_LOG(logINFO) << help_message << endl;
break;
FILE_LOG(logINFO) << help_message << std::endl;
throw std::exception();
}
}
if( !fname.empty() ){
try{
FILE_LOG(logINFO) << "config file name " << fname;
success = read_config_file(fname, &tcpip_port_no, &configuration_map);
//VERBOSE_PRINT("Read configuration file of " + iline + " lines");
}
catch(...){
FILE_LOG(logERROR) << "Coult not open configuration file " << fname ;
success = FAIL;
}
if( !fname.empty() && read_config_file(fname, &tcpip_port_no, &configuration_map) == FAIL) {
throw std::exception();
}
if (success==OK){
tcpipInterface = new slsReceiverTCPIPInterface(success, udp_interface, tcpip_port_no);
}
// might throw an exception
tcpipInterface = new slsReceiverTCPIPInterface(tcpip_port_no);
}
@ -131,40 +118,26 @@ int64_t slsReceiver::getReceiverVersion(){
}
void slsReceiver::registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg){
//tcpipInterface
if(udp_interface)
udp_interface->registerCallBackStartAcquisition(func,arg);
else
tcpipInterface->registerCallBackStartAcquisition(func,arg);
void slsReceiver::registerCallBackStartAcquisition(int (*func)(
char*, char*, uint64_t, uint32_t, void*),void *arg){
tcpipInterface->registerCallBackStartAcquisition(func,arg);
}
void slsReceiver::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void*),void *arg){
//tcpipInterface
if(udp_interface)
udp_interface->registerCallBackAcquisitionFinished(func,arg);
else
tcpipInterface->registerCallBackAcquisitionFinished(func,arg);
void slsReceiver::registerCallBackAcquisitionFinished(
void (*func)(uint64_t, void*),void *arg){
tcpipInterface->registerCallBackAcquisitionFinished(func,arg);
}
void slsReceiver::registerCallBackRawDataReady(void (*func)(char*,
char*, uint32_t, void*),void *arg){
//tcpipInterface
if(udp_interface)
udp_interface->registerCallBackRawDataReady(func,arg);
else
tcpipInterface->registerCallBackRawDataReady(func,arg);
tcpipInterface->registerCallBackRawDataReady(func,arg);
}
void slsReceiver::registerCallBackRawDataModifyReady(void (*func)(char*,
char*, uint32_t &, void*),void *arg){
//tcpipInterface
if(udp_interface)
udp_interface->registerCallBackRawDataModifyReady(func,arg);
else
tcpipInterface->registerCallBackRawDataModifyReady(func,arg);
tcpipInterface->registerCallBackRawDataModifyReady(func,arg);
}

View File

@ -17,7 +17,7 @@
#include <fstream>
#include <stdlib.h>
#include <syscall.h>
using namespace std;
#include <vector>
@ -31,16 +31,16 @@ slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() {
delete receiverBase;
}
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn):
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int pn):
myDetectorType(GOTTHARD),
receiverBase(rbase),
receiverBase(0),
ret(OK),
fnum(-1),
lockStatus(0),
killTCPServerThread(0),
tcpThreadCreated(false),
portNumber(DEFAULT_PORTNO+2),
mySock(NULL)
mySock(0)
{
//***callback parameters***
startAcquisitionCallBack = NULL;
@ -51,83 +51,25 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface*
rawDataModifyReadyCallBack = NULL;
pRawDataReady = NULL;
unsigned short int port_no=portNumber;
if(receiverBase == NULL)
receiverBase = 0;
// create socket
portNumber = (pn > 0 ? pn : DEFAULT_PORTNO + 2);
MySocketTCP* m = new MySocketTCP(portNumber);
mySock = m;
if (pn>0)
port_no = pn;
//initialize variables
strcpy(mySock->lastClientIP,"none");
strcpy(mySock->thisClientIP,"none1");
memset(mess,0,sizeof(mess));
strcpy(mess,"dummy message");
success=OK;
//create socket
if(success == OK){
mySock = new MySocketTCP(port_no);
if (mySock->getErrorStatus()) {
success = FAIL;
delete mySock;
mySock=NULL;
} else {
portNumber=port_no;
//initialize variables
strcpy(mySock->lastClientIP,"none");
strcpy(mySock->thisClientIP,"none1");
memset(mess,0,sizeof(mess));
strcpy(mess,"dummy message");
function_table();
function_table();
#ifdef VERYVERBOSE
FILE_LOG(logINFO) << "Function table assigned.";
FILE_LOG(logINFO) << "Function table assigned.";
#endif
}
}
}
int slsReceiverTCPIPInterface::setPortNumber(int pn){
memset(mess, 0, sizeof(mess));
int p_number;
MySocketTCP *oldsocket = NULL;;
int sd = 0;
if (pn > 0) {
p_number = pn;
if (p_number < 1024) {
sprintf(mess,"Too low port number %d\n", p_number);
FILE_LOG(logERROR) << mess;
} else {
oldsocket=mySock;
mySock = new MySocketTCP(p_number);
if(mySock){
sd = mySock->getErrorStatus();
if (!sd){
portNumber=p_number;
strcpy(mySock->lastClientIP,oldsocket->lastClientIP);
delete oldsocket;
} else {
FILE_LOG(logERROR) << "Could not bind port " << p_number;
if (sd == -10) {
FILE_LOG(logINFO) << "Port "<< p_number << " already set";
} else {
delete mySock;
mySock=oldsocket;
}
}
} else {
mySock=oldsocket;
}
}
}
return portNumber;
}
int slsReceiverTCPIPInterface::start(){
FILE_LOG(logDEBUG) << "Creating TCP Server Thread";
killTCPServerThread = 0;
@ -198,7 +140,7 @@ void* slsReceiverTCPIPInterface::startTCPServerThread(void *this_pointer){
void slsReceiverTCPIPInterface::startTCPServer(){
cprintf(BLUE,"Created [ TCP server Tid: %ld ]\n", (long)syscall(SYS_gettid));
FILE_LOG(logINFO) << "SLS Receiver starting TCP Server on port " << portNumber << endl;
FILE_LOG(logINFO) << "SLS Receiver starting TCP Server on port " << portNumber << std::endl;
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG5) << "Starting Receiver TCP Server";
@ -265,7 +207,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_GET_RECEIVER_ID: return "F_GET_RECEIVER_ID";
case F_GET_RECEIVER_TYPE: return "F_GET_RECEIVER_TYPE";
case F_SEND_RECEIVER_DETHOSTNAME: return "F_SEND_RECEIVER_DETHOSTNAME";
case F_RECEIVER_SHORT_FRAME: return "F_RECEIVER_SHORT_FRAME";
case F_RECEIVER_SET_ROI: return "F_RECEIVER_SET_ROI";
case F_SETUP_RECEIVER_UDP: return "F_SETUP_RECEIVER_UDP";
case F_SET_RECEIVER_TIMER: return "F_SET_RECEIVER_TIMER";
case F_SET_RECEIVER_DYNAMIC_RANGE: return "F_SET_RECEIVER_DYNAMIC_RANGE";
@ -304,6 +246,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_RECEIVER_CHECK_VERSION: return "F_RECEIVER_CHECK_VERSION";
case F_RECEIVER_DISCARD_POLICY: return "F_RECEIVER_DISCARD_POLICY";
case F_RECEIVER_PADDING_ENABLE: return "F_RECEIVER_PADDING_ENABLE";
case F_RECEIVER_DEACTIVATED_PADDING_ENABLE: return "F_RECEIVER_DEACTIVATED_PADDING_ENABLE";
default: return "Unknown Function";
}
@ -321,7 +264,7 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_GET_RECEIVER_ID] = &slsReceiverTCPIPInterface::get_id;
flist[F_GET_RECEIVER_TYPE] = &slsReceiverTCPIPInterface::set_detector_type;
flist[F_SEND_RECEIVER_DETHOSTNAME] = &slsReceiverTCPIPInterface::set_detector_hostname;
flist[F_RECEIVER_SHORT_FRAME] = &slsReceiverTCPIPInterface::set_short_frame;
flist[F_RECEIVER_SET_ROI] = &slsReceiverTCPIPInterface::set_roi;
flist[F_SETUP_RECEIVER_UDP] = &slsReceiverTCPIPInterface::setup_udp;
flist[F_SET_RECEIVER_TIMER] = &slsReceiverTCPIPInterface::set_timer;
flist[F_SET_RECEIVER_DYNAMIC_RANGE] = &slsReceiverTCPIPInterface::set_dynamic_range;
@ -360,6 +303,8 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_RECEIVER_CHECK_VERSION] = &slsReceiverTCPIPInterface::check_version_compatibility;
flist[F_RECEIVER_DISCARD_POLICY] = &slsReceiverTCPIPInterface::set_discard_policy;
flist[F_RECEIVER_PADDING_ENABLE] = &slsReceiverTCPIPInterface::set_padding_enable;
flist[F_RECEIVER_DEACTIVATED_PADDING_ENABLE] = &slsReceiverTCPIPInterface::set_deactivated_receiver_padding_enable;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
@ -567,15 +512,14 @@ int slsReceiverTCPIPInterface::get_last_client_ip() {
int slsReceiverTCPIPInterface::set_port() {
ret = OK;
memset(mess, 0, sizeof(mess));
int unused = 0;
int p_type = 0;
int p_number = -1;
MySocketTCP* mySocket = NULL;
MySocketTCP* mySocket = 0;
char oldLastClientIP[INET_ADDRSTRLEN];
memset(oldLastClientIP, 0, sizeof(oldLastClientIP));
int sd = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&unused,sizeof(unused)) < 0 )
if (mySock->ReceiveDataOnly(&p_type,sizeof(p_type)) < 0 )
return printSocketReadError();
if (mySock->ReceiveDataOnly(&p_number,sizeof(p_number)) < 0 )
return printSocketReadError();
@ -587,29 +531,26 @@ int slsReceiverTCPIPInterface::set_port() {
FILE_LOG(logERROR) << mess;
}
else {
if (p_number<1024) {
if (p_number < 1024) {
ret = FAIL;
sprintf(mess,"Port Number (%d) too low\n", p_number);
FILE_LOG(logERROR) << mess;
}
FILE_LOG(logINFO) << "set port to " << p_number <<endl;
strcpy(oldLastClientIP, mySock->lastClientIP);
mySocket = new MySocketTCP(p_number);
} else {
FILE_LOG(logINFO) << "set port to " << p_number <<std::endl;
strcpy(oldLastClientIP, mySock->lastClientIP);
if(mySocket){
sd = mySocket->getErrorStatus();
if (sd < 0) {
ret = FAIL;
sprintf(mess,"Could not bind port %d\n", p_number);
FILE_LOG(logERROR) << mess;
if (sd == -10) {
ret = FAIL;
sprintf(mess,"Port %d already set\n", p_number);
FILE_LOG(logERROR) << mess;
}
}
else
try {
mySocket = new MySocketTCP(p_number);
strcpy(mySock->lastClientIP,oldLastClientIP);
} catch(SamePortSocketException e) {
ret = FAIL;
sprintf(mess, "Could not bind port %d. It is already set\n", p_number);
FILE_LOG(logERROR) << mess;
} catch (...) {
ret = FAIL;
sprintf(mess, "Could not bind port %d.\n", p_number);
FILE_LOG(logERROR) << mess;
}
}
}
@ -622,7 +563,7 @@ int slsReceiverTCPIPInterface::set_port() {
mySock->SendDataOnly(mess,sizeof(mess));
else {
mySock->SendDataOnly(&p_number,sizeof(p_number));
if(sd>=0){
if(ret != FAIL){
mySock->Disconnect();
delete mySock;
mySock = mySocket;
@ -769,6 +710,24 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// activate
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getActivate();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// deactivated padding enable
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getDeactivatedPadding();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// silent mode
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getSilentMode();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
if (!lockStatus)
strcpy(mySock->lastClientIP,mySock->thisClientIP);
@ -921,16 +880,24 @@ int slsReceiverTCPIPInterface::set_detector_hostname() {
int slsReceiverTCPIPInterface::set_short_frame() {
int slsReceiverTCPIPInterface::set_roi() {
ret = OK;
memset(mess, 0, sizeof(mess));
int index = 0;
int retval = -100;
int nroi = 0;
// receive arguments
if (mySock->ReceiveDataOnly(&index,sizeof(index)) < 0 )
if (mySock->ReceiveDataOnly(&nroi,sizeof(nroi)) < 0 )
return printSocketReadError();
std::vector <ROI> roiLimits;
int iloop = 0;
for (iloop = 0; iloop < nroi; iloop++) {
ROI temp;
if ( mySock->ReceiveDataOnly(&temp,sizeof(ROI)) < 0 )
return printSocketReadError();
roiLimits.push_back(temp);
}
//does not exist
if (myDetectorType != GOTTHARD)
functionNotImplemented();
@ -946,8 +913,8 @@ int slsReceiverTCPIPInterface::set_short_frame() {
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setShortFrameEnable(index);
retval = receiverBase->getShortFrameEnable();
ret = receiverBase->setROI(roiLimits);
//retval = receiverBase->getROI();
}
#endif
}
@ -958,7 +925,8 @@ int slsReceiverTCPIPInterface::set_short_frame() {
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
roiLimits.clear();
// return ok/fail
return ret;
@ -999,7 +967,7 @@ int slsReceiverTCPIPInterface::setup_udp(){
//setup udpip
//get ethernet interface or IP to listen to
FILE_LOG(logINFO) << "Receiver UDP IP: " << args[0];
string temp = genericSocket::ipToName(args[0]);
std::string temp = genericSocket::ipToName(args[0]);
if (temp == "none"){
ret = FAIL;
strcpy(mess, "Failed to get ethernet interface or IP\n");
@ -1086,8 +1054,8 @@ int slsReceiverTCPIPInterface::set_timer() {
case SUBFRAME_ACQUISITION_TIME:
receiverBase->setSubExpTime(index[1]);
break;
case SUBFRAME_PERIOD:
receiverBase->setSubPeriod(index[1]);
case SUBFRAME_DEADTIME:
receiverBase->setSubPeriod(index[1] + receiverBase->getSubExpTime());
break;
case SAMPLES_JCTB:
if (myDetectorType != JUNGFRAUCTB) {
@ -1121,8 +1089,8 @@ int slsReceiverTCPIPInterface::set_timer() {
case SUBFRAME_ACQUISITION_TIME:
retval=receiverBase->getSubExpTime();
break;
case SUBFRAME_PERIOD:
retval=receiverBase->getSubPeriod();
case SUBFRAME_DEADTIME:
retval=(receiverBase->getSubPeriod() - receiverBase->getSubExpTime());
break;
case SAMPLES_JCTB:
if (myDetectorType != JUNGFRAUCTB) {
@ -1994,11 +1962,11 @@ int slsReceiverTCPIPInterface::set_activate() {
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setActivate(enable);
receiverBase->setActivate(enable > 0 ? true : false);
}
}
//get
retval = receiverBase->getActivate();
retval = (int)receiverBase->getActivate();
if(enable >= 0 && retval != enable){
ret = FAIL;
sprintf(mess,"Could not set activate to %d, returned %d\n",enable,retval);
@ -2484,7 +2452,7 @@ int slsReceiverTCPIPInterface::set_silent_mode() {
}
}
//get
retval = receiverBase->getSilentMode(); // no check required
retval = (int)receiverBase->getSilentMode(); // no check required
}
#endif
#ifdef VERYVERBOSE
@ -2548,7 +2516,7 @@ int slsReceiverTCPIPInterface::enable_gap_pixels() {
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "Activate: " << retval;
FILE_LOG(logDEBUG1) << "Gap Pixels Enable: " << retval;
#endif
if (ret == OK && mySock->differentClients)
@ -2959,3 +2927,59 @@ int slsReceiverTCPIPInterface::set_padding_enable() {
int slsReceiverTCPIPInterface::set_deactivated_receiver_padding_enable() {
ret = OK;
memset(mess, 0, sizeof(mess));
int enable = -1;
int retval = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&enable,sizeof(enable)) < 0 )
return printSocketReadError();
if (myDetectorType != EIGER)
functionNotImplemented();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
else {
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if(enable >= 0) {
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setDeactivatedPadding(enable > 0 ? true : false);
}
}
//get
retval = (int)receiverBase->getDeactivatedPadding();
if(enable >= 0 && retval != enable){
ret = FAIL;
sprintf(mess,"Could not set deactivated padding enable to %d, returned %d\n",enable,retval);
FILE_LOG(logERROR) << mess;
}
}
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "Deactivated Padding Enable: " << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}

View File

@ -2,11 +2,18 @@
#include "slsReceiver.h"
slsReceiverUsers::slsReceiverUsers(int argc, char *argv[], int &success) {
receiver=new slsReceiver(argc, argv, success);
// catch the exception here to limit it to within the library (for current version)
try {
slsReceiver* r = new slsReceiver(argc, argv);
receiver = r;
success = slsReceiverDefs::OK;
} catch (...) {
success = slsReceiverDefs::FAIL;
}
}
slsReceiverUsers::~slsReceiverUsers() {
delete receiver;
delete receiver;
}
int slsReceiverUsers::start() {
@ -28,13 +35,13 @@ void slsReceiverUsers::registerCallBackStartAcquisition(int (*func)(char*, char*
void slsReceiverUsers::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void*),void *arg){
receiver->registerCallBackAcquisitionFinished(func,arg);
}
void slsReceiverUsers::registerCallBackRawDataReady(void (*func)(char* header,
char* datapointer, uint32_t datasize, void*), void *arg){
receiver->registerCallBackRawDataReady(func,arg);
}
void slsReceiverUsers::registerCallBackRawDataModifyReady(void (*func)(char* header,
char* datapointer, uint32_t& revDatasize, void*), void *arg){
receiver->registerCallBackRawDataModifyReady(func,arg);
char* datapointer, uint32_t& revDatasize, void*), void *arg){
receiver->registerCallBackRawDataModifyReady(func,arg);
}

View File

@ -8,36 +8,42 @@
#include <map>
#include "utilities.h"
#include "logger.h"
using namespace std;
int read_config_file(string fname, int *tcpip_port_no, map<string, string> * configuration_map ){
int read_config_file(std::string fname, int *tcpip_port_no, std::map<std::string, std::string> * configuration_map ){
ifstream infile;
string sLine,sargname, sargvalue;
std::ifstream infile;
std::string sLine,sargname, sargvalue;
int iline = 0;
int success = slsReceiverDefs::OK;
FILE_LOG(logINFO) << "config file name " << fname;
try {
infile.open(fname.c_str(), std::ios_base::in);
} catch(...) {
FILE_LOG(logERROR) << "Could not open configuration file " << fname ;
success = slsReceiverDefs::FAIL;
}
infile.open(fname.c_str(), ios_base::in);
if (infile.is_open()) {
if (success == slsReceiverDefs::OK && infile.is_open()) {
while(infile.good()){
getline(infile,sLine);
iline++;
//VERBOSE_PRINT(sLine);
if(sLine.find('#') != string::npos)
if(sLine.find('#') != std::string::npos)
continue;
else if(sLine.length()<2)
continue;
else{
istringstream sstr(sLine);
std::istringstream sstr(sLine);
//parameter name
if(sstr.good()){