merging refactor (replacing)

This commit is contained in:
2019-04-12 10:53:09 +02:00
parent 0bb800cc8a
commit 89a06f099c
1176 changed files with 82698 additions and 159058 deletions

View File

@@ -1,51 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?fileVersion 4.0.0?>
<cproject storage_type_id="org.eclipse.cdt.core.XmlProjectDescriptionStorage">
<storageModule moduleId="org.eclipse.cdt.core.settings">
<cconfiguration id="0.1592123610">
<storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="0.1592123610" moduleId="org.eclipse.cdt.core.settings" name="Default">
<externalSettings/>
<extensions>
<extension id="org.eclipse.cdt.core.VCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
<extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
</extensions>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
<configuration buildProperties="" description="" id="0.1592123610" name="Default" parent="org.eclipse.cdt.build.core.prefbase.cfg">
<folderInfo id="0.1592123610." name="/" resourcePath="">
<toolChain id="org.eclipse.cdt.build.core.prefbase.toolchain.2043853512" name="No ToolChain" resourceTypeBasedDiscovery="false" superClass="org.eclipse.cdt.build.core.prefbase.toolchain">
<targetPlatform id="org.eclipse.cdt.build.core.prefbase.toolchain.2043853512.1428560959" name=""/>
<builder id="org.eclipse.cdt.build.core.settings.default.builder.1228246701" keepEnvironmentInBuildfile="false" managedBuildOn="false" name="Gnu Make Builder" superClass="org.eclipse.cdt.build.core.settings.default.builder"/>
<tool id="org.eclipse.cdt.build.core.settings.holder.libs.891716849" name="holder for library settings" superClass="org.eclipse.cdt.build.core.settings.holder.libs"/>
<tool id="org.eclipse.cdt.build.core.settings.holder.1775606856" name="Assembly" superClass="org.eclipse.cdt.build.core.settings.holder">
<inputType id="org.eclipse.cdt.build.core.settings.holder.inType.98773194" languageId="org.eclipse.cdt.core.assembly" languageName="Assembly" sourceContentType="org.eclipse.cdt.core.asmSource" superClass="org.eclipse.cdt.build.core.settings.holder.inType"/>
</tool>
<tool id="org.eclipse.cdt.build.core.settings.holder.41809089" name="GNU C++" superClass="org.eclipse.cdt.build.core.settings.holder">
<inputType id="org.eclipse.cdt.build.core.settings.holder.inType.817253715" languageId="org.eclipse.cdt.core.g++" languageName="GNU C++" sourceContentType="org.eclipse.cdt.core.cxxSource,org.eclipse.cdt.core.cxxHeader" superClass="org.eclipse.cdt.build.core.settings.holder.inType"/>
</tool>
<tool id="org.eclipse.cdt.build.core.settings.holder.1467561536" name="GNU C" superClass="org.eclipse.cdt.build.core.settings.holder">
<inputType id="org.eclipse.cdt.build.core.settings.holder.inType.210475521" languageId="org.eclipse.cdt.core.gcc" languageName="GNU C" sourceContentType="org.eclipse.cdt.core.cSource,org.eclipse.cdt.core.cHeader" superClass="org.eclipse.cdt.build.core.settings.holder.inType"/>
</tool>
</toolChain>
</folderInfo>
</configuration>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
</cconfiguration>
</storageModule>
<storageModule moduleId="cdtBuildSystem" version="4.0.0">
<project id="slsReceiver.null.670781774" name="slsReceiver"/>
</storageModule>
<storageModule moduleId="scannerConfiguration">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
<scannerConfigBuildInfo instanceId="0.1592123610">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/>
</cproject>

View File

@@ -1,28 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>Receiver</name>
<comment></comment>
<projects>
<project>newMythenSoftware</project>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name>
<triggers>clean,full,incremental,</triggers>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.cdt.managedbuilder.core.ScannerConfigBuilder</name>
<triggers>full,incremental,</triggers>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.cdt.core.cnature</nature>
<nature>org.eclipse.cdt.core.ccnature</nature>
<nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature>
<nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature>
</natures>
</projectDescription>

13
slsReceiverSoftware/src/BinaryFile.cpp Normal file → Executable file
View File

@@ -11,14 +11,14 @@
#include <iostream>
FILE* BinaryFile::masterfd = 0;
FILE* BinaryFile::masterfd = nullptr;
BinaryFile::BinaryFile(int ind, uint32_t* maxf,
int* nd, char* fname, char* fpath, uint64_t* findex, bool* owenable,
int* dindex, int* nunits, uint64_t* nf, uint32_t* dr, uint32_t* portno,
bool* smode):
File(ind, maxf, nd, fname, fpath, findex, owenable, dindex, nunits, nf, dr, portno, smode),
filefd(0),
filefd(nullptr),
numFramesInFile(0),
numActualPacketsInFile(0)
{
@@ -31,13 +31,13 @@ BinaryFile::~BinaryFile() {
CloseAllFiles();
}
void BinaryFile::PrintMembers() {
File::PrintMembers();
void BinaryFile::PrintMembers(TLogLevel level) {
File::PrintMembers(level);
FILE_LOG(logINFO) << "Max Frames Per File: " << *maxFramesPerFile;
FILE_LOG(logINFO) << "Number of Frames in File: " << numFramesInFile;
}
slsReceiverDefs::fileFormat BinaryFile::GetFileType() {
slsDetectorDefs::fileFormat BinaryFile::GetFileType() {
return BINARY;
}
@@ -106,8 +106,7 @@ int BinaryFile::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_
// if write error
if (ret != buffersize) {
cprintf(RED,"%d Error: Write to file failed for image number %lld\n",
index, (long long int)fnum);
FILE_LOG(logERROR) << index << " Error: Write to file failed for image number " << fnum;
return FAIL;
}
return OK;

115
slsReceiverSoftware/src/DataProcessor.cpp Normal file → Executable file
View File

@@ -14,6 +14,7 @@
#include "HDF5File.h"
#endif
#include "DataStreamer.h"
#include "sls_detector_exceptions.h"
#include <iostream>
#include <errno.h>
@@ -22,21 +23,18 @@
const std::string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo* f,
fileFormat* ftype, bool fwenable,
bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
bool* fp, bool* act, bool* depaden, bool* sm,
void (*dataReadycb)(char*, char*, uint32_t, void*),
void (*dataModifyReadycb)(char*, char*, uint32_t &, void*),
void *pDataReadycb) :
bool* fp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
generalData(nullptr),
fifo(f),
myDetectorType(dtype),
file(0),
file(nullptr),
dataStreamEnable(dsEnable),
fileFormatType(ftype),
fileWriteEnable(fwenable),
@@ -45,7 +43,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
streamingFrequency(freq),
streamingTimerInMs(timer),
currentFreqCount(0),
tempBuffer(0),
tempBuffer(nullptr),
activated(act),
deactivatedPaddingEnable(depaden),
silentMode(sm),
@@ -56,13 +54,10 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
firstMeasurementIndex(0),
numTotalFramesCaught(0),
numFramesCaught(0),
currentFrameIndex(0),
rawDataReadyCallBack(dataReadycb),
rawDataModifyReadyCallBack(dataModifyReadycb),
pRawDataReady(pDataReadycb)
currentFrameIndex(0)
{
if(ThreadObject::CreateThread() == FAIL)
throw std::exception();
throw sls::RuntimeError("Could not create processing thread");
FILE_LOG(logDEBUG) << "DataProcessor " << ind << " created";
@@ -125,7 +120,7 @@ void DataProcessor::StopRunning() {
runningFlag = false;
}
void DataProcessor::SetFifo(Fifo*& f) {
void DataProcessor::SetFifo(Fifo* f) {
fifo = f;
}
@@ -144,7 +139,7 @@ void DataProcessor::ResetParametersforNewMeasurement(){
if (tempBuffer) {
delete [] tempBuffer;
tempBuffer = 0;
tempBuffer = nullptr;
}
if (*gapPixelsEnable) {
tempBuffer = new char[generalData->imageSize];
@@ -166,18 +161,14 @@ void DataProcessor::RecordFirstIndices(uint64_t fnum) {
firstAcquisitionIndex = fnum;
}
#ifdef VERBOSE
cprintf(BLUE,"%d First Acquisition Index:%lld\tFirst Measurement Index:%lld\n",
index, (long long int)firstAcquisitionIndex, (long long int)firstMeasurementIndex);
#endif
FILE_LOG(logDEBUG1) << index << " First Acquisition Index:" << firstAcquisitionIndex <<
"\tFirst Measurement Index:" << firstMeasurementIndex;
}
void DataProcessor::SetGeneralData(GeneralData* g) {
generalData = g;
#ifdef VERY_VERBOSE
generalData->Print();
#endif
if (file) {
if (file->GetFileType() == HDF5) {
file->SetNumberofPixels(generalData->nPixelsX, generalData->nPixelsY);
@@ -200,10 +191,10 @@ void DataProcessor::SetFileFormat(const fileFormat f) {
if (file && file->GetFileType() != f) {
//remember the pointer values before they are destroyed
int nd[MAX_DIMENSIONS];nd[0] = 0; nd[1] = 0;
uint32_t* maxf = 0;
char* fname=0; char* fpath=0; uint64_t* findex=0;
bool* owenable=0; int* dindex=0; int* nunits=0; uint64_t* nf = 0;
uint32_t* dr = 0; uint32_t* port = 0;
uint32_t* maxf = nullptr;
char* fname=nullptr; char* fpath=nullptr; uint64_t* findex=nullptr;
bool* owenable=nullptr; int* dindex=nullptr; int* nunits=nullptr; uint64_t* nf = nullptr;
uint32_t* dr = nullptr; uint32_t* port = nullptr;
file->GetMemberPointerValues(nd, maxf, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, port);
//create file writer with same pointers
@@ -225,7 +216,7 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf,
if (file) {
delete file; file = 0;
delete file; file = nullptr;
}
if (fileWriteEnable) {
@@ -250,7 +241,7 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, uint32_t* maxf,
// only the first file
int DataProcessor::CreateNewFile(bool en, uint64_t nf, uint64_t at, uint64_t st,
uint64_t sp, uint64_t ap) {
if (file == NULL)
if (file == nullptr)
return FAIL;
file->CloseAllFiles();
if (file->CreateMasterFile(en, generalData->imageSize,
@@ -276,18 +267,14 @@ void DataProcessor::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
void DataProcessor::ThreadExecution() {
char* buffer=0;
char* buffer=nullptr;
fifo->PopAddress(buffer);
#ifdef FIFODEBUG
if (!index) cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n",
index,(void*)(buffer),buffer);
#endif
FILE_LOG(logDEBUG5) << "DataProcessor " << index << ", "
"pop 0x" << std::hex << (void*)(buffer) << std::dec << ":" << buffer;
//check dummy
uint32_t numBytes = (uint32_t)(*((uint32_t*)buffer));
#ifdef VERBOSE
if (!index) cprintf(BLUE,"DataProcessor %d, Numbytes:%u\n", index,numBytes);
#endif
FILE_LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
@@ -304,10 +291,8 @@ void DataProcessor::ThreadExecution() {
void DataProcessor::StopProcessing(char* buf) {
#ifdef VERBOSE
if (!index)
cprintf(RED,"DataProcessing %d: Dummy\n", index);
#endif
FILE_LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
//stream or free
if (*dataStreamEnable)
fifo->PushAddressToStream(buf);
@@ -317,9 +302,7 @@ void DataProcessor::StopProcessing(char* buf) {
if (file)
file->CloseCurrentFile();
StopRunning();
#ifdef VERBOSE
FILE_LOG(logINFO) << index << ": Processing Completed";
#endif
FILE_LOG(logDEBUG1) << index << ": Processing Completed";
}
@@ -335,16 +318,9 @@ void DataProcessor::ProcessAnImage(char* buf) {
numTotalFramesCaught++;
}
#ifdef VERBOSE
if (!index)
cprintf(BLUE,"DataProcessing %d: fnum:%lu\n", index, fnum);
#endif
FILE_LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum;
if (!measurementStartedFlag) {
#ifdef VERBOSE
if (!index) cprintf(BLUE,"DataProcessing %d: fnum:%lu\n", index, fnum);
#endif
RecordFirstIndices(fnum);
if (*dataStreamEnable) {
@@ -367,7 +343,7 @@ void DataProcessor::ProcessAnImage(char* buf) {
if (*activated && *framePadding && nump < generalData->packetsPerFrame)
PadMissingPackets(buf);
// deactivated and padding enabled
// deactivated and padding enabled
else if (!(*activated) && *deactivatedPaddingEnable)
PadMissingPackets(buf);
@@ -420,11 +396,10 @@ bool DataProcessor::SendToStreamer() {
bool DataProcessor::CheckTimer() {
struct timespec end;
clock_gettime(CLOCK_REALTIME, &end);
#ifdef VERBOSE
cprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index,
( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec )
/ 1000000000.0);
#endif
FILE_LOG(logDEBUG1) << index << " Timer elapsed time:" <<
(( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0)
<< " seconds";
//still less than streaming timer, keep waiting
if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec )
/ 1000000000.0) < ((double)*streamingTimerInMs/1000.00))
@@ -454,6 +429,19 @@ void DataProcessor::SetPixelDimension() {
}
}
void DataProcessor::registerCallBackRawDataReady(void (*func)(char* ,
char*, uint32_t, void*),void *arg) {
rawDataReadyCallBack=func;
pRawDataReady=arg;
}
void DataProcessor::registerCallBackRawDataModifyReady(void (*func)(char* ,
char*, uint32_t&, void*),void *arg) {
rawDataModifyReadyCallBack=func;
pRawDataReady=arg;
}
void DataProcessor::PadMissingPackets(char* buf) {
FILE_LOG(logDEBUG) << index << ": Padding Missing Packets";
@@ -466,9 +454,8 @@ void DataProcessor::PadMissingPackets(char* buf) {
uint32_t dsize = generalData->dataSize;
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
uint32_t corrected_dsize = dsize - ((pperFrame * dsize) - generalData->imageSize);
#ifdef VERBOSE
cprintf(RED,"bitmask:%s\n", pmask.to_string().c_str());
#endif
FILE_LOG(logDEBUG1) << "bitmask: " << pmask.to_string();
for (unsigned int pnum = 0; pnum < pperFrame; ++pnum) {
// not missing packet
@@ -491,7 +478,7 @@ void DataProcessor::PadMissingPackets(char* buf) {
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize+2);
break;
case JUNGFRAUCTB:
case CHIPTESTBOARD:
if (pnum == (pperFrame-1))
memset(buf + fifohsize + (pnum * dsize), 0xFF, corrected_dsize);
else
@@ -515,8 +502,8 @@ void DataProcessor::InsertGapPixels(char* buf, uint32_t dr) {
const uint32_t ny = generalData->nPixelsY;
const uint32_t npx = nx * ny;
char* srcptr = 0;
char* dstptr = 0;
char* srcptr = nullptr;
char* dstptr = nullptr;
const uint32_t b1px = generalData->imageSize / (npx); // not double as not dealing with 4 bit mode
const uint32_t b2px = 2 * b1px;
@@ -538,8 +525,8 @@ void DataProcessor::InsertGapPixels(char* buf, uint32_t dr) {
// vertical filling of values
{
char* srcgp1 = 0; char* srcgp2 = 0; char* srcgp3 = 0;
char* dstgp1 = 0; char* dstgp2 = 0; char* dstgp3 = 0;
char* srcgp1 = nullptr; char* srcgp2 = nullptr; char* srcgp3 = nullptr;
char* dstgp1 = nullptr; char* dstgp2 = nullptr; char* dstgp3 = nullptr;
const uint32_t b3px = 3 * b1px;
srcptr = tempBuffer + b1line;

104
slsReceiverSoftware/src/DataStreamer.cpp Normal file → Executable file
View File

@@ -8,6 +8,7 @@
#include "GeneralData.h"
#include "Fifo.h"
#include "ZmqSocket.h"
#include "sls_detector_exceptions.h"
#include <iostream>
#include <errno.h>
@@ -15,13 +16,13 @@
const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo*& f, uint32_t* dr, std::vector<ROI>* r,
uint64_t* fi, int* fd, char* ajh, bool* sm) :
DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, std::vector<ROI>* r,
uint64_t* fi, int* fd, char* ajh) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
generalData(nullptr),
fifo(f),
zmqSocket(0),
zmqSocket(nullptr),
dynamicRange(dr),
roi(r),
adcConfigured(-1),
@@ -32,14 +33,14 @@ DataStreamer::DataStreamer(int ind, Fifo*& f, uint32_t* dr, std::vector<ROI>* r,
measurementStartedFlag(false),
firstAcquisitionIndex(0),
firstMeasurementIndex(0),
completeBuffer(0)
completeBuffer(nullptr)
{
if(ThreadObject::CreateThread() == FAIL)
throw std::exception();
throw sls::RuntimeError("Could not create streaming thread");
FILE_LOG(logDEBUG) << "DataStreamer " << ind << " created";
memset(fileNametoStream, 0, MAX_STR_LENGTH);
// memset(fileNametoStream, 0, MAX_STR_LENGTH);
}
@@ -69,7 +70,7 @@ void DataStreamer::StopRunning() {
runningFlag = false;
}
void DataStreamer::SetFifo(Fifo*& f) {
void DataStreamer::SetFifo(Fifo* f) {
fifo = f;
}
@@ -78,14 +79,15 @@ void DataStreamer::ResetParametersforNewAcquisition() {
acquisitionStartedFlag = false;
}
void DataStreamer::ResetParametersforNewMeasurement(char* fname){
void DataStreamer::ResetParametersforNewMeasurement(const std::string& fname){
runningFlag = false;
firstMeasurementIndex = 0;
measurementStartedFlag = false;
strcpy(fileNametoStream, fname);
if (completeBuffer) {
delete [] completeBuffer;
completeBuffer = 0;
// strcpy(fileNametoStream, fname);
fileNametoStream = fname;
if (completeBuffer) {
delete[] completeBuffer;
completeBuffer = nullptr;
}
if (roi->size()) {
if (generalData->myDetectorType == GOTTHARD) {
@@ -107,18 +109,14 @@ void DataStreamer::RecordFirstIndices(uint64_t fnum) {
firstAcquisitionIndex = fnum;
}
#ifdef VERBOSE
cprintf(BLUE,"%d First Acquisition Index:%lld\tFirst Measurement Index:%lld\n",
index, (long long int)firstAcquisitionIndex, (long long int)firstMeasurementIndex);
#endif
FILE_LOG(logDEBUG1) << index << " First Acquisition Index: " << firstAcquisitionIndex <<
"\tFirst Measurement Index: " << firstMeasurementIndex;
}
void DataStreamer::SetGeneralData(GeneralData* g) {
generalData = g;
#ifdef VERY_VERBOSE
generalData->Print();
#endif
}
int DataStreamer::SetThreadPriority(int priority) {
@@ -135,9 +133,9 @@ void DataStreamer::CreateZmqSockets(int* nunits, uint32_t port, const char* srci
uint32_t portnum = port + index;
try {
zmqSocket = new ZmqSocket(portnum, (strlen(srcip)?srcip:NULL));
zmqSocket = new ZmqSocket(portnum, (strlen(srcip)?srcip:nullptr));
} catch (...) {
cprintf(RED, "Error: Could not create Zmq socket on port %d for Streamer %d\n", portnum, index);
FILE_LOG(logERROR) << "Could not create Zmq socket on port " << portnum << " for Streamer " << index;
throw;
}
FILE_LOG(logINFO) << index << " Streamer: Zmq Server started at " << zmqSocket->GetZmqServerAddress();
@@ -147,23 +145,21 @@ void DataStreamer::CreateZmqSockets(int* nunits, uint32_t port, const char* srci
void DataStreamer::CloseZmqSocket() {
if (zmqSocket) {
delete zmqSocket;
zmqSocket = 0;
zmqSocket = nullptr;
}
}
void DataStreamer::ThreadExecution() {
char* buffer=0;
char* buffer=nullptr;
fifo->PopAddressToStream(buffer);
#ifdef FIFODEBUG
if (!index) cprintf(BLUE,"DataStreamer %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
#endif
FILE_LOG(logDEBUG5) << "DataStreamer " << index << ", "
"pop 0x" << std::hex << (void*)(buffer) << std::dec << ":" << buffer;
//check dummy
uint32_t numBytes = (uint32_t)(*((uint32_t*)buffer));
#ifdef VERBOSE
cprintf(GREEN,"DataStreamer %d, Numbytes:%u\n", index,numBytes);
#endif
FILE_LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
@@ -179,20 +175,17 @@ void DataStreamer::ThreadExecution() {
void DataStreamer::StopProcessing(char* buf) {
#ifdef VERBOSE
if (!index)
cprintf(RED,"DataStreamer %d: Dummy\n", index);
#endif
FILE_LOG(logDEBUG1) << "DataStreamer " << index << ": Dummy";
sls_receiver_header* header = (sls_receiver_header*) (buf);
//send dummy header and data
if (!SendHeader(header, 0, 0, 0, true))
cprintf(RED,"Error: Could not send zmq dummy header for streamer %d\n", index);
if (!SendHeader(header, 0, 0, 0, true)) {
FILE_LOG(logERROR) << "Could not send zmq dummy header for streamer " << index;
}
fifo->FreeAddress(buf);
StopRunning();
#ifdef VERBOSE
FILE_LOG(logINFO) << index << ": Streaming Completed";
#endif
FILE_LOG(logDEBUG1) << index << ": Streaming Completed";
}
/** buf includes only the standard header */
@@ -200,14 +193,9 @@ void DataStreamer::ProcessAnImage(char* buf) {
sls_receiver_header* header = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
uint64_t fnum = header->detHeader.frameNumber;
#ifdef VERBOSE
cprintf(MAGENTA,"DataStreamer %d: fnum:%lu\n", index,fnum);
#endif
FILE_LOG(logDEBUG1) << "DataStreamer " << index << ": fnum:" << fnum;
if (!measurementStartedFlag) {
#ifdef VERBOSE
if (!index) cprintf(MAGENTA,"DataStreamer %d: fnum:%lu\n", index, fnum);
#endif
RecordFirstIndices(fnum);
}
@@ -219,17 +207,16 @@ void DataStreamer::ProcessAnImage(char* buf) {
//write imagesize
if (!SendHeader(header, generalData->imageSizeComplete,
generalData->nPixelsXComplete, generalData->nPixelsYComplete, false))
cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n",
(long long int) fnum, index);
generalData->nPixelsXComplete, generalData->nPixelsYComplete, false)) {
FILE_LOG(logERROR) << "Could not send zmq header for fnum " << fnum << " and streamer " << index;
}
memcpy(completeBuffer + ((generalData->imageSize) * adcConfigured),
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t*)buf)) );
if (!zmqSocket->SendData(completeBuffer, generalData->imageSizeComplete))
cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n",
(long long int) fnum, index);
if (!zmqSocket->SendData(completeBuffer, generalData->imageSizeComplete)) {
FILE_LOG(logERROR) << "Could not send zmq data for fnum " << fnum << " and streamer " << index;
}
}
@@ -237,14 +224,13 @@ void DataStreamer::ProcessAnImage(char* buf) {
else {
if (!SendHeader(header, (uint32_t)(*((uint32_t*)buf)),
generalData->nPixelsX, generalData->nPixelsY, false)) // new size possibly from callback
cprintf(RED,"Error: Could not send zmq header for fnum %lld and streamer %d\n",
(long long int) fnum, index);
generalData->nPixelsX, generalData->nPixelsY, false)) {// new size possibly from callback
FILE_LOG(logERROR) << "Could not send zmq header for fnum " << fnum << " and streamer " << index;
}
if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t*)buf)) )) // new size possibly from callback
cprintf(RED,"Error: Could not send zmq data for fnum %lld and streamer %d\n",
(long long int) fnum, index);
(uint32_t)(*((uint32_t*)buf)) )) {// new size possibly from callback
FILE_LOG(logERROR) << "Could not send zmq data for fnum " << fnum << " and streamer " << index;
}
}
}
@@ -262,7 +248,7 @@ int DataStreamer::SendHeader(sls_receiver_header* rheader, uint32_t size, uint32
return zmqSocket->SendHeaderData(index, dummy, SLS_DETECTOR_JSON_HEADER_VERSION, *dynamicRange, *fileIndex,
nx, ny, size,
acquisitionIndex, frameIndex, fileNametoStream,
acquisitionIndex, frameIndex, fileNametoStream.c_str(),
header.frameNumber, header.expLength, header.packetNumber, header.bunchId, header.timestamp,
header.modId, header.row, header.column, header.reserved,
header.debug, header.roundRNumber,

30
slsReceiverSoftware/src/Fifo.cpp Normal file → Executable file
View File

@@ -6,6 +6,7 @@
***********************************************/
#include "Fifo.h"
#include "sls_detector_exceptions.h"
#include <iostream>
#include <cstdlib>
@@ -14,29 +15,28 @@
Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t depth):
index(ind),
memory(0),
fifoBound(0),
fifoFree(0),
fifoStream(0),
memory(nullptr),
fifoBound(nullptr),
fifoFree(nullptr),
fifoStream(nullptr),
fifoDepth(depth),
status_fifoBound(0),
status_fifoFree(depth){
FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
if(CreateFifos(fifoItemSize) == FAIL)
throw std::exception();
throw sls::RuntimeError("Could not create FIFO");
}
Fifo::~Fifo() {
FILE_LOG(logDEBUG) << __AT__ << " called";
//cprintf(BLUE,"Fifo Object %d: Goodbye\n", index);
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
DestroyFifos();
}
int Fifo::CreateFifos(uint32_t fifoItemSize) {
FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
//destroy if not already
DestroyFifos();
@@ -48,7 +48,7 @@ int Fifo::CreateFifos(uint32_t fifoItemSize) {
//allocate memory
size_t mem_len = fifoItemSize * fifoDepth * sizeof(char);
memory = (char*) malloc (mem_len);
if (memory == NULL){
if (memory == nullptr){
FILE_LOG(logERROR) << "Could not allocate memory for fifos";
return FAIL;
}
@@ -69,23 +69,23 @@ int Fifo::CreateFifos(uint32_t fifoItemSize) {
void Fifo::DestroyFifos(){
FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logDEBUG3) << __SHORT_AT__ << " called";
if(memory) {
free(memory);
memory = 0;
memory = nullptr;
}
if (fifoBound) {
delete fifoBound;
fifoBound = 0;
fifoBound = nullptr;
}
if (fifoFree) {
delete fifoFree;
fifoFree = 0;
fifoFree = nullptr;
}
if (fifoStream) {
delete fifoStream;
fifoStream = 0;
fifoStream = nullptr;
}
}

4
slsReceiverSoftware/src/File.cpp Normal file → Executable file
View File

@@ -38,8 +38,8 @@ std::string File::GetCurrentFileName() {
return currentFileName;
}
void File::PrintMembers() {
FILE_LOG(logINFO) << "\nGeneral Writer Variables:" << std::endl
void File::PrintMembers(TLogLevel level) {
FILE_LOG(level) << "\nGeneral Writer Variables:" << std::endl
<< "Index: " << index << std::endl
<< "Max Frames Per File: " << *maxFramesPerFile << std::endl
<< "Number of Detectors in x dir: " << numDetX << std::endl

21
slsReceiverSoftware/src/HDF5File.cpp Normal file → Executable file
View File

@@ -38,9 +38,7 @@ HDF5File::HDF5File(int ind, uint32_t* maxf,
dataspace_para(0),
extNumImages(0)
{
#ifdef VERBOSE
PrintMembers();
#endif
dataset_para.clear();
parameterNames.clear();
parameterDataTypes.clear();
@@ -95,15 +93,15 @@ HDF5File::~HDF5File() {
CloseAllFiles();
}
void HDF5File::PrintMembers() {
void HDF5File::PrintMembers(TLogLevel level) {
File::PrintMembers();
UpdateDataType();
if (datatype == PredType::STD_U8LE) {
FILE_LOG(logINFO) << "Data Type: 4 or 8";
FILE_LOG(level) << "Data Type: 4 or 8";
} else if (datatype == PredType::STD_U16LE) {
FILE_LOG(logINFO) << "Data Type: 16";
FILE_LOG(level) << "Data Type: 16";
} else if (datatype == PredType::STD_U32LE) {
FILE_LOG(logINFO) << "Data Type: 32";
FILE_LOG(level) << "Data Type: 32";
} else {
FILE_LOG(logERROR) << "unknown data type";
}
@@ -116,7 +114,7 @@ void HDF5File::SetNumberofPixels(uint32_t nx, uint32_t ny) {
}
slsReceiverDefs::fileFormat HDF5File::GetFileType() {
slsDetectorDefs::fileFormat HDF5File::GetFileType() {
return HDF5;
}
@@ -154,8 +152,6 @@ int HDF5File::CreateFile(uint64_t fnum) {
return FAIL;
}
pthread_mutex_unlock(&Mutex);
if (dataspace == NULL)
cprintf(RED,"Got nothing!\n");
if(!(*silentMode)) {
FILE_LOG(logINFO) << *udpPortNumber << ": HDF5 File created: " << currentFileName;
@@ -214,9 +210,8 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
if (HDF5FileStatic::ExtendDataset(index, dataspace, dataset,
dataspace_para, dataset_para, *numImages) == OK) {
if (!(*silentMode)) {
cprintf(BLUE,"%d Extending HDF5 dataset by %llu, Total x Dimension: %llu\n",
index, (long long unsigned int)extNumImages,
(long long unsigned int)(extNumImages + *numImages));
FILE_LOG(logINFO) << index << " Extending HDF5 dataset by " <<
extNumImages << ", Total x Dimension: " << (extNumImages + *numImages);
}
extNumImages += *numImages;
}
@@ -238,7 +233,7 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
}
}
pthread_mutex_unlock(&Mutex);
cprintf(RED,"%d Error: Write to file failed\n", index);
FILE_LOG(logERROR) << index << "Write to file failed";
return FAIL;
}

210
slsReceiverSoftware/src/Listener.cpp Normal file → Executable file
View File

@@ -10,6 +10,8 @@
#include "GeneralData.h"
#include "Fifo.h"
#include "genericSocket.h"
#include "container_utils.h" // For sls::make_unique<>
#include "sls_detector_exceptions.h"
#include <iostream>
#include <errno.h>
@@ -18,17 +20,17 @@
const std::string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
Listener::Listener(int ind, detectorType dtype, Fifo* f, runStatus* s,
uint32_t* portno, char* e, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as, uint32_t* fpf,
int64_t* us, int64_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp, bool* act, bool* depaden, bool* sm) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
generalData(nullptr),
fifo(f),
myDetectorType(dtype),
status(s),
udpSocket(0),
udpSocket(nullptr),
udpPortNumber(portno),
eth(e),
numImages(nf),
@@ -50,15 +52,13 @@ Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
lastCaughtFrameIndex(0),
currentFrameIndex(0),
carryOverFlag(0),
carryOverPacket(0),
listeningPacket(0),
udpSocketAlive(0),
numPacketsStatistic(0),
numFramesStatistic(0),
oddStartingPacket(true)
{
if(ThreadObject::CreateThread() == FAIL)
throw std::exception();
throw sls::RuntimeError("Could not create listener thread");
FILE_LOG(logDEBUG) << "Listener " << ind << " created";
}
@@ -66,12 +66,10 @@ Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
Listener::~Listener() {
if (udpSocket){
delete udpSocket;
sem_post(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
if (carryOverPacket) delete [] carryOverPacket;
if (listeningPacket) delete [] listeningPacket;
ThreadObject::DestroyThread();
}
@@ -111,7 +109,7 @@ void Listener::StopRunning() {
}
void Listener::SetFifo(Fifo*& f) {
void Listener::SetFifo(Fifo* f) {
fifo = f;
}
@@ -130,14 +128,10 @@ void Listener::ResetParametersforNewMeasurement() {
numPacketsCaught = 0;
firstMeasurementIndex = 0;
carryOverFlag = false;
if (carryOverPacket)
delete [] carryOverPacket;
carryOverPacket = new char[generalData->packetSize];
memset(carryOverPacket,0,generalData->packetSize);
if (listeningPacket)
delete [] listeningPacket;
listeningPacket = new char[generalData->packetSize];
memset(listeningPacket,0,generalData->packetSize);
carryOverPacket = sls::make_unique<char[]>(generalData->packetSize);
memset(carryOverPacket.get(),0,generalData->packetSize);
listeningPacket = sls::make_unique<char[]>(generalData->packetSize);
memset(carryOverPacket.get(),0,generalData->packetSize);
numPacketsStatistic = 0;
numFramesStatistic = 0;
@@ -162,19 +156,18 @@ void Listener::RecordFirstIndices(uint64_t fnum) {
}
if(!(*silentMode)) {
if (!index) cprintf(BLUE,"%d First Acquisition Index:%lu\n"
"%d First Measurement Index:%lu\n",
index, firstAcquisitionIndex,
index, firstMeasurementIndex);
if (!index) {
FILE_LOG(logINFOBLUE) << index <<
" First Acquisition Index: " << firstAcquisitionIndex;
FILE_LOG(logDEBUG1) << index << " First Measurement Index: " << firstMeasurementIndex;
}
}
}
void Listener::SetGeneralData(GeneralData*& g) {
void Listener::SetGeneralData(GeneralData* g) {
generalData = g;
#ifdef VERY_VERBOSE
generalData->Print();
#endif
}
@@ -194,7 +187,7 @@ int Listener::CreateUDPSockets() {
}
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
if (strchr(eth,'.') != nullptr){
memset(eth, 0, MAX_STR_LENGTH);
}
if(!strlen(eth)){
@@ -204,10 +197,9 @@ int Listener::CreateUDPSockets() {
ShutDownUDPSocket();
try{
genericSocket* g = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
udpSocket = sls::make_unique<genericSocket>(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:nullptr), generalData->headerPacketSize,
*udpSocketBufferSize);
udpSocket = g;
FILE_LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber;
} catch (...) {
FILE_LOG(logERROR) << "Could not create UDP socket on port " << *udpPortNumber;
@@ -234,15 +226,13 @@ void Listener::ShutDownUDPSocket() {
// wait only if the threads have started as it is the threads that
//give a post to semaphore(at stopListening)
if (runningFlag)
sem_wait(&semaphore_socket);
delete udpSocket;
udpSocket = 0;
sem_wait(&semaphore_socket);
sem_destroy(&semaphore_socket);
}
}
int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
int Listener::CreateDummySocketForUDPSocketBufferSize(int64_t s) {
FILE_LOG(logINFO) << "Testing UDP Socket Buffer size with test port " << *udpPortNumber;
if (!(*activated)) {
@@ -250,47 +240,32 @@ int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
return OK;
}
uint32_t temp = *udpSocketBufferSize;
int64_t temp = *udpSocketBufferSize;
*udpSocketBufferSize = s;
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
if (strchr(eth,'.') != nullptr){
memset(eth, 0, MAX_STR_LENGTH);
}
// shutdown if any open
if(udpSocket){
udpSocket->ShutDownSocket();
delete udpSocket;
udpSocket = 0;
}
//create dummy socket
try {
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:NULL), generalData->headerPacketSize,
genericSocket g(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, (strlen(eth)?eth:nullptr), generalData->headerPacketSize,
*udpSocketBufferSize);
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = g.getActualUDPSocketBufferSize();
if (*actualUDPSocketBufferSize != (s*2)) {
*udpSocketBufferSize = temp;
}
} catch (...) {
FILE_LOG(logERROR) << "Could not create a test UDP socket on port " << *udpPortNumber;
return FAIL;
}
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getActualUDPSocketBufferSize();
if (*actualUDPSocketBufferSize != (s*2)) {
*udpSocketBufferSize = temp;
}
// shutdown socket
if(udpSocket){
udpSocketAlive = false;
udpSocket->ShutDownSocket();
delete udpSocket;
udpSocket = 0;
}
return OK;
}
@@ -304,9 +279,8 @@ void Listener::ThreadExecution() {
int rc = 0;
fifo->GetNewAddress(buffer);
#ifdef FIFODEBUG
cprintf(GREEN,"Listener %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
#endif
FILE_LOG(logDEBUG5) << "Listener " << index << ", "
"pop 0x" << std::hex << (void*)(buffer) << std::dec << ":" << buffer;
//udpsocket doesnt exist
if (*activated && !udpSocketAlive && !carryOverFlag) {
@@ -324,7 +298,6 @@ void Listener::ThreadExecution() {
//error check, (should not be here) if not transmitting yet (previous if) rc should be > 0
if (rc == 0) {
//cprintf(RED,"%d Socket shut down while waiting for future packet. udpsocketalive:%d\n",index, udpSocketAlive );
if (!udpSocketAlive) {
(*((uint32_t*)buffer)) = 0;
StopListening(buffer);
@@ -366,10 +339,8 @@ void Listener::StopListening(char* buf) {
StopRunning();
sem_post(&semaphore_socket);
#ifdef VERBOSE
cprintf(GREEN,"%d: Listening Packets (%u) : %llu\n", index, *udpPortNumber, numPacketsCaught);
cprintf(GREEN,"%d: Listening Completed\n", index);
#endif
FILE_LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber << ") : " << numPacketsCaught;
FILE_LOG(logDEBUG1) << index << ": Listening Completed";
}
@@ -386,8 +357,8 @@ uint32_t Listener::ListenToAnImage(char* buf) {
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
uint32_t pperFrame = generalData->packetsPerFrame;
bool isHeaderEmpty = true;
sls_detector_header* old_header = 0;
sls_receiver_header* new_header = 0;
sls_detector_header* old_header = nullptr;
sls_receiver_header* new_header = nullptr;
bool standardheader = generalData->standardheader;
uint32_t corrected_dsize = dsize - ((pperFrame * dsize) - generalData->imageSize);
@@ -422,23 +393,24 @@ uint32_t Listener::ListenToAnImage(char* buf) {
//look for carry over
if (carryOverFlag) {
//cprintf(RED,"%d carry flag\n",index);
FILE_LOG(logDEBUG3) << index << "carry flag";
//check if its the current image packet
// -------------------------- new header ----------------------------------------------------------------------
if (standardheader) {
old_header = (sls_detector_header*) (carryOverPacket + esize);
old_header = (sls_detector_header*) (&carryOverPacket[esize]);
fnum = old_header->frameNumber;
pnum = old_header->packetNumber;
}
// -------------------old header -----------------------------------------------------------------------------
else {
generalData->GetHeaderInfo(index, carryOverPacket + esize,
generalData->GetHeaderInfo(index, &carryOverPacket[esize],
*dynamicRange, oddStartingPacket, fnum, pnum, snum, bid);
}
//------------------------------------------------------------------------------------------------------------
if (fnum != currentFrameIndex) {
if (fnum < currentFrameIndex) {
cprintf(RED,"Error:(Weird), With carry flag: Frame number %lu less than current frame number %lu\n", fnum, currentFrameIndex);
FILE_LOG(logERROR) << "(Weird), With carry flag: Frame number " <<
fnum << " less than current frame number " << currentFrameIndex;
return 0;
}
switch(*frameDiscardMode) {
@@ -465,24 +437,25 @@ uint32_t Listener::ListenToAnImage(char* buf) {
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data + 640*2 bytes data !!
case GOTTHARD:
if(!pnum)
memcpy(buf + fifohsize , carryOverPacket + hsize+4, dsize-2);
memcpy(buf + fifohsize , &carryOverPacket[hsize+4], dsize-2);
else
memcpy(buf + fifohsize + dsize - 2, carryOverPacket + hsize, dsize+2);
memcpy(buf + fifohsize + dsize - 2, &carryOverPacket[hsize], dsize+2);
break;
case JUNGFRAUCTB:
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (pperFrame-1))
memcpy(buf + fifohsize + (pnum * dsize), carryOverPacket + hsize, corrected_dsize);
memcpy(buf + fifohsize + (pnum * dsize), &carryOverPacket[hsize], corrected_dsize);
else
memcpy(buf + fifohsize + (pnum * dsize), carryOverPacket + hsize, dsize);
memcpy(buf + fifohsize + (pnum * dsize), &carryOverPacket[hsize], dsize);
break;
default:
memcpy(buf + fifohsize + (pnum * dsize), carryOverPacket + hsize, dsize);
memcpy(buf + fifohsize + (pnum * dsize), &carryOverPacket[hsize], dsize);
break;
}
carryOverFlag = false;
++numpackets; //number of packets in this image (each time its copied to buf)
new_header->packetsMask[pnum] = 1;
new_header->packetsMask[((pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS)] = 1;
//writer header
if(isHeaderEmpty) {
@@ -512,7 +485,7 @@ uint32_t Listener::ListenToAnImage(char* buf) {
//listen to new packet
rc = 0;
if (udpSocketAlive){
rc = udpSocket->ReceiveDataOnly(listeningPacket);
rc = udpSocket->ReceiveDataOnly(&listeningPacket[0]);
}
// end of acquisition
if(rc <= 0) {
@@ -542,51 +515,49 @@ uint32_t Listener::ListenToAnImage(char* buf) {
// -------------------------- new header ----------------------------------------------------------------------
if (standardheader) {
old_header = (sls_detector_header*) (listeningPacket + esize);
old_header = (sls_detector_header*) (&listeningPacket[esize]);
fnum = old_header->frameNumber;
pnum = old_header->packetNumber;
}
// -------------------old header -----------------------------------------------------------------------------
else {
// set first packet to be odd or even (check required when switching from roi to no roi)
if (myDetectorType == GOTTHARD && !measurementStartedFlag) {
oddStartingPacket = generalData->SetOddStartingPacket(index, listeningPacket + esize);
}
// set first packet to be odd or even (check required when switching from roi to no roi)
if (myDetectorType == GOTTHARD && !measurementStartedFlag) {
oddStartingPacket = generalData->SetOddStartingPacket(index, &listeningPacket[esize]);
}
generalData->GetHeaderInfo(index, listeningPacket + esize,
generalData->GetHeaderInfo(index, &listeningPacket[esize],
*dynamicRange, oddStartingPacket, fnum, pnum, snum, bid);
}
//------------------------------------------------------------------------------------------------------------
// Eiger Firmware in a weird state
if (myDetectorType == EIGER && fnum == 0) {
cprintf(RED,"[%u]: Got Frame Number Zero from Firmware. Discarding Packet\n", *udpPortNumber);
FILE_LOG(logERROR) << "[" << *udpPortNumber << "]: Got Frame Number "
"Zero from Firmware. Discarding Packet";
numPacketsCaught--;
return 0;
}
lastCaughtFrameIndex = fnum;
FILE_LOG(logDEBUG5) << "Listening " << index << ": currentfindex:" << currentFrameIndex <<
", fnum:" << fnum << ", pnum:" << pnum << ", numpackets:" << numpackets;
//#ifdef VERBOSE
//if (!index)
cprintf(GREEN,"Listening %d: currentfindex:%lu, fnum:%lu, pnum:%u numpackets:%u\n",
index,currentFrameIndex, fnum, pnum, numpackets);
//#endif
if (!measurementStartedFlag)
RecordFirstIndices(fnum);
if (pnum >= pperFrame ) {
cprintf(RED,"bad packet, throwing away. packets caught so far: %d\n", numpackets);
return 0; // bad packet
}
if (pnum >= pperFrame ) {
FILE_LOG(logERROR) << "Bad packet " << pnum <<
"(fnum: " << fnum << "), throwing away. "
"Packets caught so far: " << numpackets;
return 0; // bad packet
}
//future packet by looking at image number (all other detectors)
if (fnum != currentFrameIndex) {
//cprintf(RED,"setting carry over flag to true num:%llu nump:%u\n",fnum, numpackets );
carryOverFlag = true;
memcpy(carryOverPacket,listeningPacket, generalData->packetSize);
memcpy(carryOverPacket.get(), &listeningPacket[0], generalData->packetSize);
switch(*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
@@ -612,23 +583,23 @@ uint32_t Listener::ListenToAnImage(char* buf) {
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data + 640*2 bytes data !!
case GOTTHARD:
if(!pnum)
memcpy(buf + fifohsize + (pnum * dsize), listeningPacket + hsize+4, dsize-2);
memcpy(buf + fifohsize + (pnum * dsize), &listeningPacket[hsize+4], dsize-2);
else
memcpy(buf + fifohsize + (pnum * dsize) - 2, listeningPacket + hsize, dsize+2);
memcpy(buf + fifohsize + (pnum * dsize) - 2, &listeningPacket[hsize], dsize+2);
break;
case JUNGFRAUCTB:
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (pperFrame-1))
memcpy(buf + fifohsize + (pnum * dsize), listeningPacket + hsize, corrected_dsize);
memcpy(buf + fifohsize + (pnum * dsize), &listeningPacket[hsize], corrected_dsize);
else
memcpy(buf + fifohsize + (pnum * dsize), listeningPacket + hsize, dsize);
memcpy(buf + fifohsize + (pnum * dsize), &listeningPacket[hsize], dsize);
break;
default:
memcpy(buf + fifohsize + (pnum * dsize), listeningPacket + hsize, dsize);
memcpy(buf + fifohsize + (pnum * dsize), &listeningPacket[hsize], dsize);
break;
}
++numpackets; //number of packets in this image (each time its copied to buf)
new_header->packetsMask[pnum] = 1;
new_header->packetsMask[((pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS)] = 1;
if(isHeaderEmpty) {
// -------------------------- new header ----------------------------------------------------------------------
@@ -657,19 +628,18 @@ uint32_t Listener::ListenToAnImage(char* buf) {
void Listener::PrintFifoStatistics() {
#ifdef VERBOSE
cout << "numFramesStatistic:" << numFramesStatistic << " numPacketsStatistic:" << numPacketsStatistic << endl;
#endif
FILE_LOG(logDEBUG1) << "numFramesStatistic:" << numFramesStatistic << " numPacketsStatistic:" << numPacketsStatistic;
//calculate packet loss
int64_t loss = -1;
loss = (numFramesStatistic*(generalData->packetsPerFrame)) - numPacketsStatistic;
int64_t loss = (numFramesStatistic*(generalData->packetsPerFrame)) - numPacketsStatistic;
numPacketsStatistic = 0;
numFramesStatistic = 0;
if (loss)
cprintf(RED,"[%u]: Packet_Loss:%lu Used_Fifo_Max_Level:%d \tFree_Slots_Min_Level:%d \tCurrent_Frame#:%lu\n",
*udpPortNumber,loss, fifo->GetMaxLevelForFifoBound() , fifo->GetMinLevelForFifoFree(), currentFrameIndex);
else
cprintf(GREEN,"[%u]: Packet_Loss:%lu Used_Fifo_Max_Level:%d \tFree_Slots_Min_Level:%d \tCurrent_Frame#:%lu\n",
*udpPortNumber,loss, fifo->GetMaxLevelForFifoBound(), fifo->GetMinLevelForFifoFree(), currentFrameIndex);
const auto color = loss ? logINFORED : logINFOGREEN;
FILE_LOG(color) << "[" << *udpPortNumber << "]: "
"Packet_Loss:" << loss <<
" Used_Fifo_Max_Level:" << fifo->GetMaxLevelForFifoBound() <<
" \tFree_Slots_Min_Level:" << fifo->GetMinLevelForFifoFree() <<
" \tCurrent_Frame#:" << currentFrameIndex;
}

View File

@@ -1,46 +0,0 @@
//version 1.0, base development, Ian 19/01/09
#include "MySocketTCP.h"
#include <string.h>
#include <iostream>
#include <cstdio>
int MySocketTCP::SendData(void* buf,int length){//length in characters
int ndata = SendDataAndKeepConnection(buf,length);
Disconnect();
return ndata;
}
int MySocketTCP::SendDataAndKeepConnection(void* buf,int length){//length in characters
if(last_keep_connection_open_action_was_a_send) Disconnect(); //to keep a structured data flow;
Connect();
int total_sent=SendDataOnly(buf,length);
last_keep_connection_open_action_was_a_send=1;
return total_sent;
}
int MySocketTCP::ReceiveData(void* buf,int length){//length in characters
int ndata = ReceiveDataAndKeepConnection(buf,length);
Disconnect();
return ndata;
}
int MySocketTCP::ReceiveDataAndKeepConnection(void* buf,int length){//length in characters
if(!last_keep_connection_open_action_was_a_send) Disconnect(); //to a keep structured data flow;
Connect();
// should preform two reads one to receive incomming char count
int total_received=ReceiveDataOnly(buf,length);
last_keep_connection_open_action_was_a_send=0;
return total_received;
}

12
slsReceiverSoftware/src/ThreadObject.cpp Normal file → Executable file
View File

@@ -39,7 +39,7 @@ void ThreadObject::DestroyThread() {
if(alive){
killThread = true;
sem_post(&semaphore);
pthread_join(thread,NULL);
pthread_join(thread,nullptr);
sem_destroy(&semaphore);
killThread = false;
alive = false;
@@ -56,7 +56,7 @@ int ThreadObject::CreateThread() {
sem_init(&semaphore,1,0);
killThread = false;
if(pthread_create(&thread, NULL,StartThread, (void*) this)){
if(pthread_create(&thread, nullptr,StartThread, (void*) this)){
FILE_LOG(logERROR) << "Could not create " << GetType() << " thread with index " << index;
return FAIL;
}
@@ -74,7 +74,8 @@ void* ThreadObject::StartThread(void* thisPointer) {
void ThreadObject::RunningThread() {
cprintf(BLUE,"Created [ %s Thread %d, Tid: %ld ]\n", GetType().c_str(),index, (long)syscall(SYS_gettid));
FILE_LOG(logINFOBLUE) << "Created [ " << GetType() << "Thread " << index << ", "
"Tid: " << syscall(SYS_gettid) << "]";
while(true) {
while(IsRunning()) {
@@ -88,8 +89,9 @@ void ThreadObject::RunningThread() {
sem_wait(&semaphore);
if(killThread) {
cprintf(BLUE,"Exiting [ %s Thread %d, Tid: %ld ]\n", GetType().c_str(),index, (long)syscall(SYS_gettid));
pthread_exit(NULL);
FILE_LOG(logINFOBLUE) << "Exiting [ " << GetType() <<
" Thread " << index << ", Tid: " << syscall(SYS_gettid) << "]";
pthread_exit(nullptr);
}
}//end of outer loop

View File

@@ -1,825 +0,0 @@
//#ifdef SLS_RECEIVER_UDP_FUNCTIONS
/********************************************//**
* @file UDPBaseImplementation.cpp
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
#include "UDPBaseImplementation.h"
#include <sys/stat.h> // stat
#include <iostream>
#include <string.h>
/*************************************************************************
* Constructor & Destructor **********************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
UDPBaseImplementation::UDPBaseImplementation(){
FILE_LOG(logDEBUG) << __AT__ << " starting";
initializeMembers();
//***callback parameters***
startAcquisitionCallBack = NULL;
pStartAcquisition = NULL;
acquisitionFinishedCallBack = NULL;
pAcquisitionFinished = NULL;
rawDataReadyCallBack = NULL;
rawDataModifyReadyCallBack = NULL;
pRawDataReady = NULL;
}
void UDPBaseImplementation::initializeMembers(){
FILE_LOG(logDEBUG) << __AT__ << " starting";
FILE_LOG(logDEBUG) << "Info: Initializing base members";
//**detector parameters***
for (int i = 0; i < MAX_DIMENSIONS; ++i)
numDet[i] = 0;
detID = 0;
myDetectorType = GENERIC;
strcpy(detHostname,"");
acquisitionPeriod = 0;
acquisitionTime = 0;
subExpTime = 0;
subPeriod = 0;
numberOfFrames = 0;
numberOfSamples = 0;
dynamicRange = 16;
tengigaEnable = false;
fifoDepth = 0;
flippedData[0] = 0;
flippedData[1] = 0;
gapPixelsEnable = false;
//***receiver parameters***
status = IDLE;
activated = true;
deactivatedPaddingEnable = true;
frameDiscardMode = NO_DISCARD;
framePadding = false;
//***connection parameters***
strcpy(eth,"");
for(int i=0;i<MAX_NUMBER_OF_LISTENING_THREADS;i++){
udpPortNum[i] = DEFAULT_UDP_PORTNO + i;
}
udpSocketBufferSize = 0;
actualUDPSocketBufferSize = 0;
//***file parameters***
fileFormatType = BINARY;
strcpy(fileName,"run");
strcpy(filePath,"");
fileIndex = 0;
framesPerFile = 0;
scanTag = 0;
fileWriteEnable = true;
overwriteEnable = true;
dataCompressionEnable = false;
//***acquisition parameters***
roi.clear();
frameToGuiFrequency = 0;
frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER_IN_MS;
dataStreamEnable = false;
streamingPort = 0;
memset(streamingSrcIP, 0, sizeof(streamingSrcIP));
memset(additionalJsonHeader, 0, sizeof(additionalJsonHeader));
//***receiver parameters***
silentMode = false;
}
UDPBaseImplementation::~UDPBaseImplementation(){}
/*************************************************************************
* Getters ***************************************************************
* They access local cache of configuration or detector parameters *******
*************************************************************************/
/**initial parameters***/
int* UDPBaseImplementation::getMultiDetectorSize() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return (int*) numDet;
}
int UDPBaseImplementation::getDetectorPositionId() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return detID;
}
char *UDPBaseImplementation::getDetectorHostname() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
//not initialized
if(!strlen(detHostname))
return NULL;
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,detHostname);
//freed by calling function
return output;
}
int UDPBaseImplementation::getFlippedData(int axis) const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(axis<0 || axis > 1) return -1;
return flippedData[axis];
}
bool UDPBaseImplementation::getGapPixelsEnable() const {
FILE_LOG(logDEBUG) << __AT__ << " starting";
return gapPixelsEnable;
}
/***file parameters***/
slsReceiverDefs::fileFormat UDPBaseImplementation::getFileFormat() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileFormatType;
}
char *UDPBaseImplementation::getFileName() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
//not initialized
if(!strlen(fileName))
return NULL;
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,fileName);
//freed by calling function
return output;
}
char *UDPBaseImplementation::getFilePath() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
//not initialized
if(!strlen(filePath))
return NULL;
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,filePath);
//freed by calling function
return output;
}
uint64_t UDPBaseImplementation::getFileIndex() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileIndex;
}
uint32_t UDPBaseImplementation::getFramesPerFile() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return framesPerFile;
}
slsReceiverDefs::frameDiscardPolicy UDPBaseImplementation::getFrameDiscardPolicy() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return frameDiscardMode;
}
bool UDPBaseImplementation::getFramePaddingEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return framePadding;
}
int UDPBaseImplementation::getScanTag() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return scanTag;
}
bool UDPBaseImplementation::getFileWriteEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fileWriteEnable;
}
bool UDPBaseImplementation::getOverwriteEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return overwriteEnable;
}
bool UDPBaseImplementation::getDataCompressionEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return dataCompressionEnable;
}
/***acquisition count parameters***/
uint64_t UDPBaseImplementation::getTotalFramesCaught() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return 0;
}
uint64_t UDPBaseImplementation::getFramesCaught() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return 0;
}
int64_t UDPBaseImplementation::getAcquisitionIndex() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return -1;
}
/***connection parameters***/
uint32_t UDPBaseImplementation::getUDPPortNumber() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return udpPortNum[0];
}
uint32_t UDPBaseImplementation::getUDPPortNumber2() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return udpPortNum[1];
}
char *UDPBaseImplementation::getEthernetInterface() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,eth);
//freed by calling function
return output;
}
/***acquisition parameters***/
std::vector<slsReceiverDefs::ROI> UDPBaseImplementation::getROI() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return roi;
}
uint32_t UDPBaseImplementation::getFrameToGuiFrequency() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return frameToGuiFrequency;
}
uint32_t UDPBaseImplementation::getFrameToGuiTimer() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return frameToGuiTimerinMS;
}
bool UDPBaseImplementation::getDataStreamEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return dataStreamEnable;
}
uint64_t UDPBaseImplementation::getAcquisitionPeriod() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return acquisitionPeriod;
}
uint64_t UDPBaseImplementation::getAcquisitionTime() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return acquisitionTime;
}
uint64_t UDPBaseImplementation::getSubExpTime() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return subExpTime;
}
uint64_t UDPBaseImplementation::getSubPeriod() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return subPeriod;
}
uint64_t UDPBaseImplementation::getNumberOfFrames() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return numberOfFrames;
}
uint64_t UDPBaseImplementation::getNumberofSamples() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return numberOfSamples;
}
uint32_t UDPBaseImplementation::getDynamicRange() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return dynamicRange;}
bool UDPBaseImplementation::getTenGigaEnable() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return tengigaEnable;
}
uint32_t UDPBaseImplementation::getFifoDepth() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return fifoDepth;
}
/***receiver status***/
slsReceiverDefs::runStatus UDPBaseImplementation::getStatus() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return status;}
bool UDPBaseImplementation::getSilentMode() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return silentMode;}
bool UDPBaseImplementation::getActivate() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return activated;
}
bool UDPBaseImplementation::getDeactivatedPadding() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return deactivatedPaddingEnable;
}
uint32_t UDPBaseImplementation::getStreamingPort() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return streamingPort;
}
char *UDPBaseImplementation::getStreamingSourceIP() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
char* output = new char[MAX_STR_LENGTH]();
strcpy(output,streamingSrcIP);
//freed by calling function
return output;
}
char *UDPBaseImplementation::getAdditionalJsonHeader() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
char* output = new char[MAX_STR_LENGTH]();
memset(output, 0, MAX_STR_LENGTH);
strcpy(output,additionalJsonHeader);
//freed by calling function
return output;
}
uint32_t UDPBaseImplementation::getUDPSocketBufferSize() const {
FILE_LOG(logDEBUG) << __AT__ << " starting";
return udpSocketBufferSize;
}
uint32_t UDPBaseImplementation::getActualUDPSocketBufferSize() const {
FILE_LOG(logDEBUG) << __AT__ << " starting";
return actualUDPSocketBufferSize;
}
/*************************************************************************
* Setters ***************************************************************
* They modify the local cache of configuration or detector parameters ***
*************************************************************************/
/**initial parameters***/
void UDPBaseImplementation::configure(std::map<std::string, std::string> config_map){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
void UDPBaseImplementation::setMultiDetectorSize(const int* size) {
FILE_LOG(logDEBUG) << __AT__ << " starting";
char message[100];
strcpy(message, "Detector Size: (");
for (int i = 0; i < MAX_DIMENSIONS; ++i) {
if (myDetectorType == EIGER && (!i))
numDet[i] = size[i]*2;
else
numDet[i] = size[i];
sprintf(message,"%s%d",message,numDet[i]);
if (i < MAX_DIMENSIONS-1 )
strcat(message,",");
}
strcat(message,")");
FILE_LOG(logINFO) << message;
}
void UDPBaseImplementation::setFlippedData(int axis, int enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(axis<0 || axis>1) return;
flippedData[axis] = enable==0?0:1;
FILE_LOG(logINFO) << "Flipped Data: " << flippedData[0] << " , " << flippedData[1];
}
int UDPBaseImplementation::setGapPixelsEnable(const bool b) {
FILE_LOG(logDEBUG) << __AT__ << " starting";
gapPixelsEnable = b;
FILE_LOG(logINFO) << "Gap Pixels Enable: " << gapPixelsEnable;
// overridden
return OK;
}
/***file parameters***/
void UDPBaseImplementation::setFileFormat(const fileFormat f){
FILE_LOG(logDEBUG) << __AT__ << " starting";
switch(f){
#ifdef HDF5C
case HDF5:
fileFormatType = HDF5;
break;
#endif
default:
fileFormatType = BINARY;
break;
}
FILE_LOG(logINFO) << "File Format: " << getFileFormatType(fileFormatType);
}
void UDPBaseImplementation::setFileName(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(strlen(c))
strcpy(fileName, c);
FILE_LOG(logINFO) << "File name: " << fileName;
}
void UDPBaseImplementation::setFilePath(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(strlen(c)){
//check if filepath exists
struct stat st;
if(stat(c,&st) == 0)
strcpy(filePath,c);
else
FILE_LOG(logERROR) << "FilePath does not exist: " << filePath;
}
FILE_LOG(logINFO) << "File path: " << filePath;
}
void UDPBaseImplementation::setFileIndex(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
fileIndex = i;
FILE_LOG(logINFO) << "File Index: " << fileIndex;
}
void UDPBaseImplementation::setFramesPerFile(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
framesPerFile = i;
FILE_LOG(logINFO) << "Frames per file: " << framesPerFile;
}
void UDPBaseImplementation::setFrameDiscardPolicy(const frameDiscardPolicy i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if (i >= 0 && i < NUM_DISCARD_POLICIES)
frameDiscardMode = i;
FILE_LOG(logINFO) << "Frame Discard Policy: " << getFrameDiscardPolicyType(frameDiscardMode);
}
void UDPBaseImplementation::setFramePaddingEnable(const bool i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
framePadding = i;
FILE_LOG(logINFO) << "Frame Padding: " << framePadding;
}
//FIXME: needed?
void UDPBaseImplementation::setScanTag(const int i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
scanTag = i;
FILE_LOG(logINFO) << "Scan Tag: " << scanTag;
}
void UDPBaseImplementation::setFileWriteEnable(const bool b){
FILE_LOG(logDEBUG) << __AT__ << " starting";
fileWriteEnable = b;
FILE_LOG(logINFO) << "File Write Enable: " << stringEnable(fileWriteEnable);
}
void UDPBaseImplementation::setOverwriteEnable(const bool b){
FILE_LOG(logDEBUG) << __AT__ << " starting";
overwriteEnable = b;
FILE_LOG(logINFO) << "Overwrite Enable: " << stringEnable(overwriteEnable);
}
int UDPBaseImplementation::setDataCompressionEnable(const bool b){
FILE_LOG(logDEBUG) << __AT__ << " starting";
dataCompressionEnable = b;
FILE_LOG(logINFO) << "Data Compression : " << stringEnable(dataCompressionEnable);
//overridden methods might return FAIL
return OK;
}
/***connection parameters***/
void UDPBaseImplementation::setUDPPortNumber(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
udpPortNum[0] = i;
FILE_LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0];
}
void UDPBaseImplementation::setUDPPortNumber2(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
udpPortNum[1] = i;
FILE_LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1];
}
void UDPBaseImplementation::setEthernetInterface(const char* c){
FILE_LOG(logDEBUG) << __AT__ << " starting";
strcpy(eth, c);
FILE_LOG(logINFO) << "Ethernet Interface: " << eth;
}
/***acquisition parameters***/
int UDPBaseImplementation::setROI(const std::vector<slsReceiverDefs::ROI> i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
roi = i;
std::stringstream sstm;
sstm << "ROI: ";
if (!roi.size())
sstm << "0";
else {
for (unsigned int i = 0; i < roi.size(); ++i) {
sstm << "( " <<
roi[i].xmin << ", " <<
roi[i].xmax << ", " <<
roi[i].ymin << ", " <<
roi[i].ymax << " )";
}
}
std::string message = sstm.str();
FILE_LOG(logINFO) << message;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setFrameToGuiFrequency(const uint32_t freq){
FILE_LOG(logDEBUG) << __AT__ << " starting";
frameToGuiFrequency = freq;
FILE_LOG(logINFO) << "Frame To Gui Frequency: " << frameToGuiFrequency;
//overrridden child classes might return FAIL
return OK;
}
void UDPBaseImplementation::setFrameToGuiTimer(const uint32_t time_in_ms){
FILE_LOG(logDEBUG) << __AT__ << " starting";
frameToGuiTimerinMS = time_in_ms;
FILE_LOG(logINFO) << "Frame To Gui Timer: " << frameToGuiTimerinMS;
}
int UDPBaseImplementation::setDataStreamEnable(const bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
dataStreamEnable = enable;
FILE_LOG(logINFO) << "Streaming Data from Receiver: " << dataStreamEnable;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setAcquisitionPeriod(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
acquisitionPeriod = i;
FILE_LOG(logINFO) << "Acquisition Period: " << (double)acquisitionPeriod/(1E9) << "s";
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setAcquisitionTime(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
acquisitionTime = i;
FILE_LOG(logINFO) << "Acquisition Time: " << (double)acquisitionTime/(1E9) << "s";
//overrridden child classes might return FAIL
return OK;
}
void UDPBaseImplementation::setSubExpTime(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
subExpTime = i;
FILE_LOG(logINFO) << "Sub Exposure Time: " << (double)subExpTime/(1E9) << "s";
}
void UDPBaseImplementation::setSubPeriod(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
subPeriod = i;
FILE_LOG(logINFO) << "Sub Exposure Time: " << (double)subPeriod/(1E9) << "s";
}
int UDPBaseImplementation::setNumberOfFrames(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
numberOfFrames = i;
FILE_LOG(logINFO) << "Number of Frames: " << numberOfFrames;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setNumberofSamples(const uint64_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
numberOfSamples = i;
FILE_LOG(logINFO) << "Number of Samples: " << numberOfSamples;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setDynamicRange(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
dynamicRange = i;
FILE_LOG(logINFO) << "Dynamic Range: " << dynamicRange;
//overrridden child classes might return FAIL
return OK;
}
int UDPBaseImplementation::setTenGigaEnable(const bool b){
FILE_LOG(logDEBUG) << __AT__ << " starting";
tengigaEnable = b;
FILE_LOG(logINFO) << "Ten Giga Enable: " << stringEnable(tengigaEnable);
//overridden functions might return FAIL
return OK;
}
int UDPBaseImplementation::setFifoDepth(const uint32_t i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
fifoDepth = i;
FILE_LOG(logINFO) << "Fifo Depth: " << i;
//overridden functions might return FAIL
return OK;
}
/***receiver parameters***/
void UDPBaseImplementation::setSilentMode(const bool i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
silentMode = i;
FILE_LOG(logINFO) << "Silent Mode: " << i;
}
/*************************************************************************
* Behavioral functions***************************************************
* They may modify the status of the receiver ****************************
*************************************************************************/
/***initial functions***/
int UDPBaseImplementation::setDetectorType(const detectorType d){
FILE_LOG(logDEBUG) << __AT__ << " starting";
myDetectorType = d;
//if eiger, set numberofListeningThreads = 2;
FILE_LOG(logINFO) << "Detector Type: " << getDetectorType(d);
return OK;
}
void UDPBaseImplementation::setDetectorPositionId(const int i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
detID = i;
FILE_LOG(logINFO) << "Detector Position Id: " << detID;
}
void UDPBaseImplementation::initialize(const char *c){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(strlen(c))
strcpy(detHostname, c);
FILE_LOG(logINFO) << "Detector Hostname: " << detHostname;
}
/***acquisition functions***/
void UDPBaseImplementation::resetAcquisitionCount(){
FILE_LOG(logDEBUG) << __AT__ << " starting";
//overriden by resetting of new acquisition parameters
}
int UDPBaseImplementation::startReceiver(char *c){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
return OK;
}
void UDPBaseImplementation::stopReceiver(){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
void UDPBaseImplementation::startReadout(){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
void UDPBaseImplementation::shutDownUDPSockets(){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
//FIXME: needed, isnt stopReceiver enough?
void UDPBaseImplementation::abort(){
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
}
bool UDPBaseImplementation::setActivate(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
activated = enable;
FILE_LOG(logINFO) << "Activation: " << stringEnable(activated);
return activated;
}
bool UDPBaseImplementation::setDeactivatedPadding(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
deactivatedPaddingEnable = enable;
FILE_LOG(logINFO) << "Deactivated Padding Enable: " << stringEnable(deactivatedPaddingEnable);
return deactivatedPaddingEnable;
}
void UDPBaseImplementation::setStreamingPort(const uint32_t i) {
streamingPort = i;
FILE_LOG(logINFO) << "Streaming Port: " << streamingPort;
}
void UDPBaseImplementation::setStreamingSourceIP(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
strcpy(streamingSrcIP, c);
FILE_LOG(logINFO) << "Streaming Source IP: " << streamingSrcIP;
}
void UDPBaseImplementation::setAdditionalJsonHeader(const char c[]){
FILE_LOG(logDEBUG) << __AT__ << " starting";
strcpy(additionalJsonHeader, c);
FILE_LOG(logINFO) << "Additional JSON Header: " << additionalJsonHeader;
}
int UDPBaseImplementation::setUDPSocketBufferSize(const uint32_t s) {
FILE_LOG(logDEBUG) << __AT__ << " starting";
udpSocketBufferSize = s;
return OK;
}
int UDPBaseImplementation::restreamStop() {
FILE_LOG(logERROR) << __AT__ << " doing nothing...";
FILE_LOG(logERROR) << __AT__ << " must be overridden by child classes";
return OK;
}
/***callback functions***/
void UDPBaseImplementation::registerCallBackStartAcquisition(int (*func)(char*, char*, uint64_t, uint32_t, void*),void *arg){
startAcquisitionCallBack=func;
pStartAcquisition=arg;
}
void UDPBaseImplementation::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void*),void *arg){
acquisitionFinishedCallBack=func;
pAcquisitionFinished=arg;
}
void UDPBaseImplementation::registerCallBackRawDataReady(void (*func)(char* ,
char*, uint32_t, void*),void *arg){
rawDataReadyCallBack=func;
pRawDataReady=arg;
}
void UDPBaseImplementation::registerCallBackRawDataModifyReady(void (*func)(char* ,
char*, uint32_t&, void*),void *arg){
rawDataModifyReadyCallBack=func;
pRawDataReady=arg;
}
//#endif

View File

@@ -1,30 +0,0 @@
//#ifdef SLS_RECEIVER_UDP_FUNCTIONS
/********************************************//**
* @file slsReceiverUDPFunctions.cpp
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
#include <iostream>
#include <string.h>
#include "UDPInterface.h"
#include "UDPBaseImplementation.h"
#include "UDPStandardImplementation.h"
UDPInterface * UDPInterface::create(std::string receiver_type){
if (receiver_type == "standard"){
FILE_LOG(logINFO) << "Starting " << receiver_type;
return new UDPStandardImplementation();
}
else{
FILE_LOG(logERROR) << "UDP interface not supported, using base implementation";
return new UDPBaseImplementation();
}
}
//#endif

View File

@@ -1,822 +0,0 @@
/********************************************//**
* @file UDPStandardImplementation.cpp
* @short does all the functions for a receiver, set/get parameters, start/stop etc.
***********************************************/
#include "UDPStandardImplementation.h"
#include "GeneralData.h"
#include "Listener.h"
#include "DataProcessor.h"
#include "DataStreamer.h"
#include "Fifo.h"
#include "ZmqSocket.h" //just for the zmq port define
#include <cstdlib> //system
#include <cstring> //strcpy
#include <errno.h> //eperm
#include <fstream>
/** cosntructor & destructor */
UDPStandardImplementation::UDPStandardImplementation() {
InitializeMembers();
}
UDPStandardImplementation::~UDPStandardImplementation() {
DeleteMembers();
}
void UDPStandardImplementation::DeleteMembers() {
if (generalData) { delete generalData; generalData=0;}
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
}
void UDPStandardImplementation::InitializeMembers() {
UDPBaseImplementation::initializeMembers();
acquisitionPeriod = SAMPLE_TIME_IN_NS;
//*** receiver parameters ***
numThreads = 1;
numberofJobs = 1;
nroichannels = 0;
//** class objects ***
generalData = 0;
}
/*** Overloaded Functions called by TCP Interface ***/
uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
std::vector<DataProcessor*>::const_iterator it;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetMeasurementStartedFlag() ? 1 : 0);
sum += (*it)->GetNumTotalFramesCaught();
}
//no data processed
if (flagsum != dataProcessor.size()) return 0;
return (sum/dataProcessor.size());
}
uint64_t UDPStandardImplementation::getFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetNumFramesCaught();
}
//no data processed
if (flagsum != dataProcessor.size()) return 0;
return (sum/dataProcessor.size());
}
int64_t UDPStandardImplementation::getAcquisitionIndex() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
flagsum += ((*it)->GetAcquisitionStartedFlag() ? 1 : 0);
sum += (*it)->GetActualProcessedAcquisitionIndex();
}
//no data processed
if (flagsum != dataProcessor.size()) return -1;
return (sum/dataProcessor.size());
}
int UDPStandardImplementation::setGapPixelsEnable(const bool b) {
if (gapPixelsEnable != b) {
gapPixelsEnable = b;
// side effects
generalData->SetGapPixelsEnable(b, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
FILE_LOG(logINFO) << "Gap Pixels Enable: " << gapPixelsEnable;
return OK;
}
void UDPStandardImplementation::setFileFormat(const fileFormat f){
switch(f){
#ifdef HDF5C
case HDF5:
fileFormatType = HDF5;
break;
#endif
default:
fileFormatType = BINARY;
break;
}
//destroy file writer, set file format and create file writer
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetFileFormat(f);
FILE_LOG(logINFO) << "File Format:" << getFileFormatType(fileFormatType);
}
void UDPStandardImplementation::setFileWriteEnable(const bool b){
if (fileWriteEnable != b){
fileWriteEnable = b;
for (unsigned int i = 0; i < dataProcessor.size(); ++i) {
dataProcessor[i]->SetupFileWriter(fileWriteEnable, (int*)numDet,
&framesPerFile, fileName, filePath, &fileIndex, &overwriteEnable,
&detID, &numThreads, &numberOfFrames, &dynamicRange, &udpPortNum[i],
generalData);
}
}
FILE_LOG(logINFO) << "File Write Enable: " << stringEnable(fileWriteEnable);
}
int UDPStandardImplementation::setROI(const std::vector<slsReceiverDefs::ROI> i) {
if (myDetectorType != GOTTHARD) {
cprintf(RED, "Error: Can not set ROI for this detector\n");
return FAIL;
}
bool change = false;
if (roi.size() != i.size())
change = true;
else {
for (unsigned int iloop = 0; iloop < i.size(); ++iloop) {
if (
(roi[iloop].xmin != i[iloop].xmin) ||
(roi[iloop].xmax != i[iloop].xmax) ||
(roi[iloop].ymin != i[iloop].ymin) ||
(roi[iloop].xmax != i[iloop].xmax)) {
change = true;
break;
}
}
}
if (change) {
roi = i;
generalData->SetROI(i);
framesPerFile = generalData->maxFramesPerFile;
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetGeneralData(generalData);
}
std::stringstream sstm;
sstm << "ROI: ";
if (!roi.size())
sstm << "0";
else {
for (unsigned int i = 0; i < roi.size(); ++i) {
sstm << "( " <<
roi[i].xmin << ", " <<
roi[i].xmax << ", " <<
roi[i].ymin << ", " <<
roi[i].ymax << " )";
}
}
std::string message = sstm.str();
FILE_LOG(logINFO) << message;
return OK;
}
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
if (frameToGuiFrequency != freq) {
frameToGuiFrequency = freq;
}
FILE_LOG(logINFO) << "Frame to Gui Frequency: " << frameToGuiFrequency;
return OK;
}
int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
if (dataStreamEnable != enable) {
dataStreamEnable = enable;
//data sockets have to be created again as the client ones are
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
if (enable) {
for ( int i = 0; i < numThreads; ++i ) {
try {
DataStreamer* s = new DataStreamer(i, fifo[i], &dynamicRange,
&roi, &fileIndex, flippedData, additionalJsonHeader, &silentMode);
dataStreamer.push_back(s);
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP);
}
catch(...) {
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
return FAIL;
}
}
SetThreadPriorities();
}
}
FILE_LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
return OK;
}
int UDPStandardImplementation::setNumberofSamples(const uint64_t i) {
if (numberOfSamples != i) {
numberOfSamples = i;
generalData->setNumberofSamples(i, nroichannels);
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
FILE_LOG (logINFO) << "Number of Samples: " << numberOfSamples;
FILE_LOG (logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
return OK;
}
int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
if (dynamicRange != i) {
dynamicRange = i;
//side effects
generalData->SetDynamicRange(i,tengigaEnable);
generalData->SetGapPixelsEnable(gapPixelsEnable, dynamicRange);
// to update npixelsx, npixelsy in file writer
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetPixelDimension();
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
FILE_LOG(logINFO) << "Dynamic Range: " << dynamicRange;
return OK;
}
int UDPStandardImplementation::setTenGigaEnable(const bool b) {
if (tengigaEnable != b) {
tengigaEnable = b;
//side effects
generalData->SetTenGigaEnable(b,dynamicRange);
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
FILE_LOG(logINFO) << "Ten Giga: " << stringEnable(tengigaEnable);
return OK;
}
int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
if (fifoDepth != i) {
fifoDepth = i;
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
}
FILE_LOG(logINFO) << "Fifo Depth: " << i;
return OK;
}
int UDPStandardImplementation::setDetectorType(const detectorType d) {
FILE_LOG(logDEBUG) << "Setting receiver type";
DeleteMembers();
InitializeMembers();
myDetectorType = d;
switch(myDetectorType) {
case GOTTHARD:
case PROPIX:
case MOENCH:
case EIGER:
case JUNGFRAUCTB:
case JUNGFRAU:
FILE_LOG(logINFO) << " ***** " << getDetectorType(d) << " Receiver *****";
break;
default:
FILE_LOG(logERROR) << "This is an unknown receiver type " << (int)d;
return FAIL;
}
//set detector specific variables
switch(myDetectorType) {
case GOTTHARD: generalData = new GotthardData(); break;
case PROPIX: generalData = new PropixData(); break;
case MOENCH: generalData = new Moench02Data(); break;
case EIGER: generalData = new EigerData(); break;
case JUNGFRAUCTB: generalData = new JCTBData(); break;
case JUNGFRAU: generalData = new JungfrauData(); break;
default: break;
}
numThreads = generalData->threadsPerReceiver;
fifoDepth = generalData->defaultFifoDepth;
udpSocketBufferSize = generalData->defaultUdpSocketBufferSize;
framesPerFile = generalData->maxFramesPerFile;
//local network parameters
SetLocalNetworkParameters();
//create fifo structure
numberofJobs = -1;
if (SetupFifoStructure() == FAIL) {
FILE_LOG(logERROR) << "Could not allocate memory for fifo structure";
return FAIL;
}
//create threads
for ( int i = 0; i < numThreads; ++i ) {
try {
Listener* l = new Listener(i, myDetectorType, fifo[i], &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode, &activated, &deactivatedPaddingEnable, &silentMode);
listener.push_back(l);
DataProcessor* p = new DataProcessor(i, myDetectorType, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
&framePadding, &activated, &deactivatedPaddingEnable, &silentMode,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
dataProcessor.push_back(p);
}
catch (...) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
return FAIL;
}
}
//set up writer and callbacks
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
SetThreadPriorities();
// check udp socket buffer size
setUDPSocketBufferSize(udpSocketBufferSize);
FILE_LOG(logDEBUG) << " Detector type set to " << getDetectorType(d);
return OK;
}
void UDPStandardImplementation::setDetectorPositionId(const int i){
detID = i;
FILE_LOG(logINFO) << "Detector Position Id:" << detID;
for (unsigned int i = 0; i < dataProcessor.size(); ++i) {
dataProcessor[i]->SetupFileWriter(fileWriteEnable, (int*)numDet,
&framesPerFile, fileName, filePath, &fileIndex, &overwriteEnable,
&detID, &numThreads, &numberOfFrames, &dynamicRange, &udpPortNum[i],
generalData);
}
for (unsigned int i = 0; i < listener.size(); ++i) {
uint16_t row = 0, col = 0;
row = detID % numDet[1]; // row
col = (detID / numDet[1]) * ((myDetectorType == EIGER) ? 2 : 1) + i; // col for horiz. udp ports
listener[i]->SetHardCodedPosition(row, col);
}
}
void UDPStandardImplementation::resetAcquisitionCount() {
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
FILE_LOG(logINFO) << "Acquisition Count has been reset";
}
int UDPStandardImplementation::startReceiver(char *c) {
cprintf(RESET,"\n");
FILE_LOG(logINFO) << "Starting Receiver";
ResetParametersforNewMeasurement();
//listener
if (CreateUDPSockets() == FAIL) {
strcpy(c,"Could not create UDP Socket(s).");
FILE_LOG(logERROR) << c;
return FAIL;
}
//callbacks
if (startAcquisitionCallBack) {
startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition);
if (rawDataReadyCallBack != NULL) {
FILE_LOG(logINFO) << "Data Write has been defined externally";
}
}
//processor->writer
if (fileWriteEnable) {
if (SetupWriter() == FAIL) {
strcpy(c,"Could not create file.");
FILE_LOG(logERROR) << c;
return FAIL;
}
} else
FILE_LOG(logINFO) << "File Write Disabled";
FILE_LOG(logINFO) << "Ready ...";
//status
status = RUNNING;
//Let Threads continue to be ready for acquisition
StartRunning();
FILE_LOG(logINFO) << "Receiver Started";
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
return OK;
}
void UDPStandardImplementation::stopReceiver(){
FILE_LOG(logINFO) << "Stopping Receiver";
//set status to transmitting
startReadout();
//wait for the processes (Listener and DataProcessor) to be done
bool running = true;
while(running) {
running = false;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
if ((*it)->IsRunning())
running = true;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
}
//create virtual file
if (fileWriteEnable && fileFormatType == HDF5) {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
}
//to create virtual file & set files/acquisition to 0 (only hdf5 at the moment)
dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught);
}
//wait for the processes (DataStreamer) to be done
running = true;
while(running) {
running = false;
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
}
status = RUN_FINISHED;
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
{ //statistics
uint64_t tot = 0;
for (int i = 0; i < numThreads; i++) {
tot += dataProcessor[i]->GetNumFramesCaught();
uint64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetPacketsCaught();
if ((int)missingpackets > 0) {
cprintf(RED, "\n[Port %d]\n",udpPortNum[i]);
cprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)missingpackets);
cprintf(RED, "Complete Frames\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught());
cprintf(RED, "Last Frame Caught\t: %lld\n",(long long int)listener[i]->GetLastFrameIndexCaught());
}else{
cprintf(GREEN, "\n[Port %d]\n",udpPortNum[i]);
cprintf(GREEN, "Missing Packets\t\t: %lld\n",(long long int)missingpackets);
cprintf(GREEN, "Complete Frames\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught());
cprintf(GREEN, "Last Frame Caught\t: %lld\n",(long long int)listener[i]->GetLastFrameIndexCaught());
}
}
if(!activated)
cprintf(RED,"Note: Deactivated Receiver\n");
//callback
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((tot/numThreads), pAcquisitionFinished);
}
//change status
status = IDLE;
FILE_LOG(logINFO) << "Receiver Stopped";
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
}
void UDPStandardImplementation::startReadout(){
if(status == RUNNING){
// wait for incoming delayed packets
//current packets caught
volatile int totalP = 0,prev=-1;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
//wait as long as there is change from prev totalP,
while(prev != totalP){
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d\n",
prev,totalP);
#endif
//usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);
usleep(5*1000);/* Need to find optimal time **/
prev = totalP;
totalP = 0;
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
#endif
}
}
//set status
status = TRANSMITTING;
FILE_LOG(logINFO) << "Status: Transmitting";
}
//shut down udp sockets so as to make listeners push dummy (end) packets for processors
shutDownUDPSockets();
}
void UDPStandardImplementation::shutDownUDPSockets() {
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ShutDownUDPSocket();
}
void UDPStandardImplementation::closeFiles() {
uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->CloseFiles();
maxIndexCaught = std::max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
}
//to create virtual file & set files/acquisition to 0 (only hdf5 at the moment)
dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught);
}
int UDPStandardImplementation::setUDPSocketBufferSize(const uint32_t s) {
if (listener.size())
return listener[0]->CreateDummySocketForUDPSocketBufferSize(s);
return FAIL;
}
int UDPStandardImplementation::restreamStop() {
bool ret = OK;
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
if ((*it)->RestreamStop() == FAIL)
ret = FAIL;
}
// if fail, prints in datastreamer
if (ret == OK) {
FILE_LOG(logINFO) << "Restreaming Dummy Header via ZMQ successful";
}
return ret;
}
void UDPStandardImplementation::SetLocalNetworkParameters() {
// to increase Max length of input packet queue
int max_back_log;
const char *proc_file_name = "/proc/sys/net/core/netdev_max_backlog";
{
std::ifstream proc_file(proc_file_name);
proc_file >> max_back_log;
}
if (max_back_log < MAX_SOCKET_INPUT_PACKET_QUEUE) {
std::ofstream proc_file(proc_file_name);
if (proc_file.good()) {
proc_file << MAX_SOCKET_INPUT_PACKET_QUEUE << std::endl;
cprintf(GREEN, "Max length of input packet queue "
"[/proc/sys/net/core/netdev_max_backlog] modified to %d\n",
MAX_SOCKET_INPUT_PACKET_QUEUE);
} else {
const char *msg = "Could not change max length of "
"input packet queue [net.core.netdev_max_backlog]. (No Root Privileges?)";
FILE_LOG(logWARNING) << msg;
}
}
}
void UDPStandardImplementation::SetThreadPriorities() {
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Could not prioritize listener threads. (No Root Privileges?)";
return;
}
}
std::ostringstream osfn;
osfn << "Priorities set - "
"Listener:" << LISTENER_PRIORITY;
FILE_LOG(logINFO) << osfn.str();
}
int UDPStandardImplementation::SetupFifoStructure() {
numberofJobs = 1;
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
for ( int i = 0; i < numThreads; ++i ) {
//create fifo structure
try {
Fifo* f = new Fifo (i,
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
fifoDepth);
fifo.push_back(f);
} catch (...) {
cprintf(RED,"Error: Could not allocate memory for fifo structure of index %d\n", i);
for (std::vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
return FAIL;
}
//set the listener & dataprocessor threads to point to the right fifo
if(listener.size())listener[i]->SetFifo(fifo[i]);
if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]);
if(dataStreamer.size())dataStreamer[i]->SetFifo(fifo[i]);
}
FILE_LOG(logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ;
FILE_LOG(logINFO) << numThreads << " Fifo structure(s) reconstructed";
return OK;
}
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
if (dataStreamEnable) {
char fnametostream[MAX_STR_LENGTH];
snprintf(fnametostream, MAX_STR_LENGTH, "%s/%s", filePath, fileName);
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement(fnametostream);
}
}
int UDPStandardImplementation::CreateUDPSockets() {
bool error = false;
for (unsigned int i = 0; i < listener.size(); ++i)
if (listener[i]->CreateUDPSockets() == FAIL) {
error = true;
break;
}
if (error) {
shutDownUDPSockets();
return FAIL;
}
FILE_LOG(logDEBUG) << "UDP socket(s) created successfully.";
return OK;
}
int UDPStandardImplementation::SetupWriter() {
bool error = false;
for (unsigned int i = 0; i < dataProcessor.size(); ++i)
if (dataProcessor[i]->CreateNewFile(tengigaEnable,
numberOfFrames, acquisitionTime, subExpTime, subPeriod, acquisitionPeriod) == FAIL) {
error = true;
break;
}
if (error) {
shutDownUDPSockets();
closeFiles();
return FAIL;
}
return OK;
}
void UDPStandardImplementation::StartRunning() {
//set running mask and post semaphore to start the inner loop in execution thread
for (std::vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
(*it)->StartRunning();
(*it)->Continue();
}
for (std::vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}
for (std::vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}
}

64
slsReceiverSoftware/src/main.cpp Normal file → Executable file
View File

@@ -1,17 +1,14 @@
/* A simple server in the internet domain using TCP
The port number is passed as an argument */
#include "sls_receiver_defs.h"
#include "sls_detector_defs.h"
#include "slsReceiverUsers.h"
#include "logger.h"
#include <iostream>
#include <string.h>
#include <signal.h> //SIGINT
#include <cstdlib> //system
#include "utilities.h"
#include "logger.h"
#include <sys/types.h> //wait
#include <sys/wait.h> //wait
#include <unistd.h> //usleep
@@ -24,37 +21,40 @@ void sigInterruptHandler(int p){
keeprunning = false;
}
/** Define Colors to print data call back in different colors for different recievers */
/*
int StartAcq(char* filepath, char* filename, uint64_t fileindex, uint32_t datasize, void*p){
printf("#### StartAcq: filepath:%s filename:%s fileindex:%llu datasize:%u ####\n",
filepath, filename, fileindex, datasize);
#define PRINT_IN_COLOR(c,f, ...) printf ("\033[%dm" f RESET, 30 + c+1, ##__VA_ARGS__)
cprintf(BLUE, "--StartAcq: returning 0\n");
int StartAcq(char* filepath, char* filename, uint64_t fileindex, uint32_t datasize, void*p){
FILE_LOG(logINFO) << "#### StartAcq: "
"filepath: " << filepath << "filename: " << filename <<
"fileindex: " << fileindex << "datasize: " << datasize << " ####";
FILE_LOG(logINFO) << "--StartAcq: returning 0";
return 0;
}
void AcquisitionFinished(uint64_t frames, void*p){
cprintf(BLUE, "#### AcquisitionFinished: frames:%llu ####\n",frames);
FILE_LOG(logINFO) << "#### AcquisitionFinished: frames:" << frames << " ####";
}
void GetData(char* metadata, char* datapointer, uint32_t datasize, void* p){
slsReceiverDefs::sls_receiver_header* header = (slsReceiverDefs::sls_receiver_header*)metadata;
slsReceiverDefs::sls_detector_header detectorHeader = header->detHeader;
slsDetectorDefs::sls_receiver_header* header = (slsDetectorDefs::sls_receiver_header*)metadata;
slsDetectorDefs::sls_detector_header detectorHeader = header->detHeader;
PRINT_IN_COLOR (detectorHeader.modId?detectorHeader.modId:detectorHeader.row,
"#### %d GetData: ####\n"
"frameNumber: %llu\t\texpLength: %u\t\tpacketNumber: %u\t\tbunchId: %llu"
"\t\ttimestamp: %llu\t\tmodId: %u\t\t"
"xCrow%u\t\tcolumn: %u\t\tcolumn: %u\t\tdebug: %u"
"frameNumber: %lu\t\texpLength: %u\t\tpacketNumber: %u\t\tbunchId: %lu"
"\t\ttimestamp: %lu\t\tmodId: %u\t\t"
"row: %u\t\tcolumn: %u\t\treserved: %u\t\tdebug: %u"
"\t\troundRNumber: %u\t\tdetType: %u\t\tversion: %u"
//"\t\tpacketsMask:%s"
"\t\tfirstbytedata: 0x%x\t\tdatsize: %u\n\n",
detectorHeader.row, detectorHeader.frameNumber,
detectorHeader.expLength, detectorHeader.packetNumber, detectorHeader.bunchId,
detectorHeader.timestamp, detectorHeader.modId,
detectorHeader.row, detectorHeader.column, detectorHeader.column,
detectorHeader.row, (long unsigned int)detectorHeader.frameNumber,
detectorHeader.expLength, detectorHeader.packetNumber, (long unsigned int)detectorHeader.bunchId,
(long unsigned int)detectorHeader.timestamp, detectorHeader.modId,
detectorHeader.row, detectorHeader.column, detectorHeader.reserved,
detectorHeader.debug, detectorHeader.roundRNumber,
detectorHeader.detType, detectorHeader.version,
//header->packetsMask.to_string().c_str(),
@@ -66,15 +66,15 @@ void GetData(char* metadata, char* datapointer, uint32_t datasize, void* p){
int main(int argc, char *argv[]) {
keeprunning = true;
cprintf(BLUE,"Created [ Tid: %ld ]\n", (long)syscall(SYS_gettid));
FILE_LOG(logINFOBLUE) << "Created [ Tid: " << syscall(SYS_gettid) << " ]";
// Catch signal SIGINT to close files and call destructors properly
struct sigaction sa;
sa.sa_flags=0; // no flags
sa.sa_handler=sigInterruptHandler; // handler function
sigemptyset(&sa.sa_mask); // dont block additional signals during invocation of handler
if (sigaction(SIGINT, &sa, NULL) == -1) {
cprintf(RED, "Could not set handler function for SIGINT\n");
if (sigaction(SIGINT, &sa, nullptr) == -1) {
FILE_LOG(logERROR) << "Could not set handler function for SIGINT";
}
@@ -84,16 +84,16 @@ int main(int argc, char *argv[]) {
asa.sa_flags=0; // no flags
asa.sa_handler=SIG_IGN; // handler function
sigemptyset(&asa.sa_mask); // dont block additional signals during invocation of handler
if (sigaction(SIGPIPE, &asa, NULL) == -1) {
cprintf(RED, "Could not set handler function for SIGPIPE\n");
if (sigaction(SIGPIPE, &asa, nullptr) == -1) {
FILE_LOG(logERROR) << "Could not set handler function for SIGPIPE";
}
int ret = slsReceiverDefs::OK;
int ret = slsDetectorDefs::OK;
slsReceiverUsers *receiver = new slsReceiverUsers(argc, argv, ret);
if(ret==slsReceiverDefs::FAIL){
if(ret==slsDetectorDefs::FAIL){
delete receiver;
cprintf(BLUE,"Exiting [ Tid: %ld ]\n", (long)syscall(SYS_gettid));
FILE_LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
exit(EXIT_FAILURE);
}
@@ -140,20 +140,20 @@ int main(int argc, char *argv[]) {
//start tcp server thread
if (receiver->start() == slsReceiverDefs::FAIL){
if (receiver->start() == slsDetectorDefs::FAIL){
delete receiver;
cprintf(BLUE,"Exiting [ Tid: %ld ]\n", (long)syscall(SYS_gettid));
FILE_LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
exit(EXIT_FAILURE);
}
FILE_LOG(logINFO) << "Ready ... ";
cprintf(RESET, "\n[ Press \'Ctrl+c\' to exit ]\n");
FILE_LOG(logINFO) << "[ Press \'Ctrl+c\' to exit ]";
while(keeprunning)
pause();
delete receiver;
cprintf(BLUE,"Exiting [ Tid: %ld ]\n", (long)syscall(SYS_gettid));
FILE_LOG(logINFO) << "Goodbye!";
FILE_LOG(logINFOBLUE) << "Exiting [ Tid: " << syscall(SYS_gettid) << " ]";
FILE_LOG(logINFO) << "Exiting Receiver";
return 0;
}

53
slsReceiverSoftware/src/slsReceiver.cpp Normal file → Executable file
View File

@@ -11,31 +11,30 @@
#include <map>
#include <getopt.h>
#include "container_utils.h" // For sls::make_unique<>
#include "slsReceiver.h"
#include "gitInfoReceiver.h"
#include "slsReceiverTCPIPInterface.h"
#include "sls_detector_exceptions.h"
#include "versionAPI.h"
#include "logger.h"
slsReceiver::slsReceiver(int argc, char *argv[]):
tcpipInterface (0) {
tcpipInterface (nullptr) {
// options
std::map<std::string, std::string> configuration_map;
int tcpip_port_no = 1954;
std::string fname = "";
int64_t tempval = 0;
//parse command line for config
static struct option long_options[] = {
// These options set a flag.
//{"verbose", no_argument, &verbose_flag, 1},
// These options dont set a flag. We distinguish them by their indices.
{"config", required_argument, 0, 'f'},
{"rx_tcpport", required_argument, 0, 't'},
{"version", no_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
{"rx_tcpport", required_argument, nullptr, 't'},
{"version", no_argument, nullptr, 'v'},
{"help", no_argument, nullptr, 'h'},
{nullptr, 0, nullptr, 0}
};
//initialize global optind variable (required when instantiating multiple receivers in the same process)
@@ -53,22 +52,13 @@ slsReceiver::slsReceiver(int argc, char *argv[]):
switch(c){
case 'f':
fname = optarg;
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG) << long_options[option_index].name << " " << optarg << endl;
#endif
break;
case 't':
sscanf(optarg, "%d", &tcpip_port_no);
break;
case 'v':
tempval = GITREV;
tempval = (tempval <<32) | GITDATE;
std::cout << "SLS Receiver " << GITBRANCH << " (0x" << std::hex << tempval << ")" << std::endl;
throw std::exception();
std::cout << "SLS Receiver " << GITBRANCH << " (0x" << std::hex << APIRECEIVER << ")" << std::endl;
throw sls::RuntimeError();
case 'h':
default:
@@ -76,30 +66,25 @@ slsReceiver::slsReceiver(int argc, char *argv[]):
+ std::string(argv[0]) + "\n"
+ "Usage: " + std::string(argv[0]) + " [arguments]\n"
+ "Possible arguments are:\n"
+ "\t-f, --config <fname> : Loads config from file\n"
+ "\t-t, --rx_tcpport <port> : TCP Communication Port with client. \n"
+ "\t Default: 1954. Required for multiple \n"
+ "\t receivers\n\n";
FILE_LOG(logINFO) << help_message << std::endl;
throw std::exception();
throw sls::RuntimeError();
}
}
if( !fname.empty() && read_config_file(fname, &tcpip_port_no, &configuration_map) == FAIL) {
throw std::exception();
}
// might throw an exception
tcpipInterface = new slsReceiverTCPIPInterface(tcpip_port_no);
tcpipInterface = sls::make_unique<slsReceiverTCPIPInterface>(tcpip_port_no);
}
slsReceiver::~slsReceiver() {
if(tcpipInterface)
delete tcpipInterface;
slsReceiver::slsReceiver(int tcpip_port_no)
{
// might throw an exception
tcpipInterface = sls::make_unique<slsReceiverTCPIPInterface>(tcpip_port_no);
}

File diff suppressed because it is too large Load Diff

3031
slsReceiverSoftware/src/slsReceiverTCPIPInterface.cpp Normal file → Executable file

File diff suppressed because it is too large Load Diff

14
slsReceiverSoftware/src/slsReceiverUsers.cpp Normal file → Executable file
View File

@@ -1,19 +1,19 @@
#include "container_utils.h" // For sls::make_unique<>
#include "slsReceiverUsers.h"
#include "slsReceiver.h"
slsReceiverUsers::slsReceiverUsers(int argc, char *argv[], int &success) {
// catch the exception here to limit it to within the library (for current version)
try {
slsReceiver* r = new slsReceiver(argc, argv);
receiver = r;
success = slsReceiverDefs::OK;
receiver = sls::make_unique<slsReceiver>(argc, argv);
success = slsDetectorDefs::OK;
} catch (...) {
success = slsReceiverDefs::FAIL;
success = slsDetectorDefs::FAIL;
}
}
slsReceiverUsers::~slsReceiverUsers() {
delete receiver;
slsReceiverUsers::slsReceiverUsers(int tcpip_port_no) {
receiver = sls::make_unique<slsReceiver>(tcpip_port_no);
}
int slsReceiverUsers::start() {

View File

@@ -1,80 +0,0 @@
#include <iostream>
#include <string>
#include <sstream>
#include <iostream>
#include <stdio.h>
#include <fstream>
#include <map>
#include "utilities.h"
#include "logger.h"
int read_config_file(std::string fname, int *tcpip_port_no, std::map<std::string, std::string> * configuration_map ){
std::ifstream infile;
std::string sLine,sargname, sargvalue;
int iline = 0;
int success = slsReceiverDefs::OK;
FILE_LOG(logINFO) << "config file name " << fname;
try {
infile.open(fname.c_str(), std::ios_base::in);
} catch(...) {
FILE_LOG(logERROR) << "Could not open configuration file " << fname ;
success = slsReceiverDefs::FAIL;
}
if (success == slsReceiverDefs::OK && infile.is_open()) {
while(infile.good()){
getline(infile,sLine);
iline++;
//VERBOSE_PRINT(sLine);
if(sLine.find('#') != std::string::npos)
continue;
else if(sLine.length()<2)
continue;
else{
std::istringstream sstr(sLine);
//parameter name
if(sstr.good()){
sstr >> sargname;
if (! sstr.good())
continue;
sstr >> sargvalue;
(*configuration_map)[sargname] = sargvalue;
}
//tcp port
if(sargname=="rx_tcpport"){
if(sstr.good()) {
sstr >> sargname;
if(sscanf(sargname.c_str(),"%d",tcpip_port_no))
cprintf(RESET, "dataport: %d\n" , *tcpip_port_no);
else{
cprintf(RED, "could not decode port in config file. Exiting.\n");
success = slsReceiverDefs::FAIL;
}
}
}
}
}
infile.close();
}
return success;
}