slsReceiver: remove static members in Listener, DataProcessing and DataStreamer:

* Needed when more than one receiver is created by process
* Replace NumberofXxxx by explicit index in constructor
* Remove Error[Mask], use return value in constructor signature
* Replace RunningMask by individual Running flags
* Remove obsolete Mutex objects
This commit is contained in:
2018-04-25 09:24:39 +02:00
parent 62a88dadba
commit b5909044f6
11 changed files with 200 additions and 344 deletions

View File

@ -22,18 +22,8 @@ using namespace std;
const string DataProcessor::TypeName = "DataProcessor";
int DataProcessor::NumberofDataProcessors(0);
uint64_t DataProcessor::ErrorMask(0x0);
uint64_t DataProcessor::RunningMask(0x0);
pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER;
bool DataProcessor::SilentMode(false);
DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool fwenable, bool* dsEnable, bool* gpEnable, uint32_t* dr,
DataProcessor::DataProcessor(int& ret, int ind, Fifo*& f, fileFormat* ftype, bool fwenable, bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t,
uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t,
@ -44,7 +34,8 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool fwenable, bool* d
char*, uint32_t &, void*),
void *pDataReadycb) :
ThreadObject(NumberofDataProcessors),
ThreadObject(ind),
runningFlag(0),
generalData(0),
fifo(f),
file(0),
@ -65,18 +56,16 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool fwenable, bool* d
numTotalFramesCaught(0),
numFramesCaught(0),
currentFrameIndex(0),
silentMode(false),
rawDataReadyCallBack(dataReadycb),
rawDataModifyReadyCallBack(dataModifyReadycb),
pRawDataReady(pDataReadycb)
{
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
ret = FAIL;
if(ThreadObject::CreateThread() == OK)
ret = OK;
NumberofDataProcessors++;
FILE_LOG(logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors;
FILE_LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void*)&timerBegin, 0, sizeof(timespec));
}
@ -86,35 +75,15 @@ DataProcessor::~DataProcessor() {
if (file) delete file;
if (tempBuffer) delete [] tempBuffer;
ThreadObject::DestroyThread();
NumberofDataProcessors--;
}
/** static functions */
uint64_t DataProcessor::GetErrorMask() {
return ErrorMask;
}
uint64_t DataProcessor::GetRunningMask() {
return RunningMask;
}
void DataProcessor::ResetRunningMask() {
RunningMask = 0x0;
}
void DataProcessor::SetSilentMode(bool mode) {
SilentMode = mode;
}
/** non static functions */
/** getters */
string DataProcessor::GetType(){
return TypeName;
}
bool DataProcessor::IsRunning() {
return ((1 << index) & RunningMask);
return runningFlag;
}
bool DataProcessor::GetAcquisitionStartedFlag(){
@ -149,16 +118,12 @@ uint64_t DataProcessor::GetProcessedMeasurementIndex() {
/** setters */
void DataProcessor::StartRunning() {
pthread_mutex_lock(&Mutex);
RunningMask |= (1<<index);
pthread_mutex_unlock(&Mutex);
runningFlag = true;
}
void DataProcessor::StopRunning() {
pthread_mutex_lock(&Mutex);
RunningMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
runningFlag = false;
}
void DataProcessor::SetFifo(Fifo*& f) {
@ -173,6 +138,7 @@ void DataProcessor::ResetParametersforNewAcquisition() {
}
void DataProcessor::ResetParametersforNewMeasurement(){
runningFlag = false;
numFramesCaught = 0;
firstMeasurementIndex = 0;
measurementStartedFlag = false;
@ -253,7 +219,7 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, char* fname, char* fpath,
if (g)
generalData = g;
// fix xcoord as detector is not providing it right now
xcoordin1D = ((NumberofDataProcessors > (*nunits)) ? index : ((*dindex) * (*nunits)) + index);
xcoordin1D = ((*dindex) * (*nunits)) + index;
if (file) {
@ -267,13 +233,13 @@ void DataProcessor::SetupFileWriter(bool fwe, int* nd, char* fname, char* fpath,
file = new HDF5File(index, generalData->maxFramesPerFile,
nd, fname, fpath, findex, owenable,
dindex, nunits, nf, dr, portno,
generalData->nPixelsX, generalData->nPixelsY, &SilentMode);
generalData->nPixelsX, generalData->nPixelsY, &silentMode);
break;
#endif
default:
file = new BinaryFile(index, generalData->maxFramesPerFile,
nd, fname, fpath, findex, owenable,
dindex, nunits, nf, dr, portno, &SilentMode);
dindex, nunits, nf, dr, portno, &silentMode);
break;
}
}
@ -499,6 +465,10 @@ void DataProcessor::SetPixelDimension() {
}
}
void DataProcessor::SetSilentMode(bool mode) {
silentMode = mode;
}
/** eiger specific */
void DataProcessor::InsertGapPixels(char* buf, uint32_t dr) {

View File

@ -15,19 +15,10 @@ using namespace std;
const string DataStreamer::TypeName = "DataStreamer";
int DataStreamer::NumberofDataStreamers(0);
uint64_t DataStreamer::ErrorMask(0x0);
uint64_t DataStreamer::RunningMask(0x0);
pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER;
bool DataStreamer::SilentMode(false);
DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd, char* ajh) :
ThreadObject(NumberofDataStreamers),
DataStreamer::DataStreamer(int& ret, int ind, Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, int* fd, char* ajh) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
fifo(f),
zmqSocket(0),
@ -40,16 +31,14 @@ DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable, uint64_t* fi, i
firstMeasurementIndex(0),
completeBuffer(0),
flippedData(fd),
additionJsonHeader(ajh)
additionJsonHeader(ajh),
silentMode(false)
{
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
ret = FAIL;
if(ThreadObject::CreateThread() == OK)
ret = OK;
NumberofDataStreamers++;
FILE_LOG(logDEBUG) << "Number of DataStreamers: " << NumberofDataStreamers;
FILE_LOG(logDEBUG) << "DataStreamer " << ind << " created";
strcpy(fileNametoStream, "");
}
@ -59,51 +48,26 @@ DataStreamer::~DataStreamer() {
CloseZmqSocket();
if (completeBuffer) delete [] completeBuffer;
ThreadObject::DestroyThread();
NumberofDataStreamers--;
}
/** static functions */
uint64_t DataStreamer::GetErrorMask() {
return ErrorMask;
}
uint64_t DataStreamer::GetRunningMask() {
return RunningMask;
}
void DataStreamer::ResetRunningMask() {
RunningMask = 0x0;
}
void DataStreamer::SetSilentMode(bool mode) {
SilentMode = mode;
}
/** non static functions */
/** getters */
string DataStreamer::GetType(){
return TypeName;
}
bool DataStreamer::IsRunning() {
return ((1 << index) & RunningMask);
return runningFlag;
}
/** setters */
void DataStreamer::StartRunning() {
pthread_mutex_lock(&Mutex);
RunningMask |= (1<<index);
pthread_mutex_unlock(&Mutex);
runningFlag = true;
}
void DataStreamer::StopRunning() {
pthread_mutex_lock(&Mutex);
RunningMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
runningFlag = false;
}
void DataStreamer::SetFifo(Fifo*& f) {
@ -116,6 +80,7 @@ void DataStreamer::ResetParametersforNewAcquisition() {
}
void DataStreamer::ResetParametersforNewMeasurement(char* fname){
runningFlag = false;
firstMeasurementIndex = 0;
measurementStartedFlag = false;
strcpy(fileNametoStream, fname);
@ -295,7 +260,7 @@ int DataStreamer::SendHeader(sls_detector_header* header, uint32_t size, uint32_
int DataStreamer::restreamStop() {
int DataStreamer::RestreamStop() {
//send dummy header
int ret = zmqSocket->SendHeaderData(index, true, SLS_DETECTOR_JSON_HEADER_VERSION);
if (!ret) {
@ -304,3 +269,9 @@ int DataStreamer::restreamStop() {
}
return OK;
}
void DataStreamer::SetSilentMode(bool mode) {
silentMode = mode;
}

View File

@ -12,10 +12,9 @@
#include <cstring>
using namespace std;
int Fifo::NumberofFifoClassObjects(0);
Fifo::Fifo(uint32_t fifoItemSize, uint32_t depth, bool &success):
index(NumberofFifoClassObjects),
Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t depth, bool &success):
index(ind),
memory(0),
fifoBound(0),
fifoFree(0),
@ -24,7 +23,7 @@ Fifo::Fifo(uint32_t fifoItemSize, uint32_t depth, bool &success):
status_fifoBound(0),
status_fifoFree(depth){
FILE_LOG(logDEBUG) << __AT__ << " called";
NumberofFifoClassObjects++;
success = true;
if(CreateFifos(fifoItemSize) == FAIL)
success = false;
}
@ -34,7 +33,6 @@ Fifo::~Fifo() {
FILE_LOG(logDEBUG) << __AT__ << " called";
//cprintf(BLUE,"Fifo Object %d: Goodbye\n", index);
DestroyFifos();
NumberofFifoClassObjects--;
}

View File

@ -18,19 +18,11 @@ using namespace std;
const string Listener::TypeName = "Listener";
int Listener::NumberofListeners(0);
uint64_t Listener::ErrorMask(0x0);
uint64_t Listener::RunningMask(0x0);
pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER;
bool Listener::SilentMode(false);
Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) :
ThreadObject(NumberofListeners),
Listener::Listener(int& ret, int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
fifo(f),
myDetectorType(dtype),
@ -51,15 +43,14 @@ Listener::Listener(detectorType dtype, Fifo*& f, runStatus* s, uint32_t* portno,
carryOverFlag(0),
carryOverPacket(0),
listeningPacket(0),
udpSocketAlive(0)
udpSocketAlive(0),
silentMode(false)
{
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
NumberofListeners++;
FILE_LOG(logDEBUG) << "Number of Listeners: " << NumberofListeners;
ret = FAIL;
if(ThreadObject::CreateThread() == OK)
ret = OK;
FILE_LOG(logDEBUG) << "Listener " << ind << " created";
}
@ -70,35 +61,15 @@ Listener::~Listener() {
if (carryOverPacket) delete [] carryOverPacket;
if (listeningPacket) delete [] listeningPacket;
ThreadObject::DestroyThread();
NumberofListeners--;
}
/** static functions */
uint64_t Listener::GetErrorMask() {
return ErrorMask;
}
uint64_t Listener::GetRunningMask() {
return RunningMask;
}
void Listener::ResetRunningMask() {
RunningMask = 0x0;
}
void Listener::SetSilentMode(bool mode) {
SilentMode = mode;
}
/** non static functions */
/** getters */
string Listener::GetType(){
return TypeName;
}
bool Listener::IsRunning() {
return ((1 << index) & RunningMask);
return runningFlag;
}
bool Listener::GetAcquisitionStartedFlag(){
@ -119,16 +90,12 @@ uint64_t Listener::GetLastFrameIndexCaught() {
/** setters */
void Listener::StartRunning() {
pthread_mutex_lock(&Mutex);
RunningMask |= (1<<index);
pthread_mutex_unlock(&Mutex);
runningFlag = true;
}
void Listener::StopRunning() {
pthread_mutex_lock(&Mutex);
RunningMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
runningFlag = false;
}
@ -146,6 +113,7 @@ void Listener::ResetParametersforNewAcquisition() {
void Listener::ResetParametersforNewMeasurement() {
runningFlag = false;
measurementStartedFlag = false;
numPacketsCaught = 0;
firstMeasurementIndex = 0;
@ -181,7 +149,7 @@ void Listener::RecordFirstIndices(uint64_t fnum) {
firstAcquisitionIndex = fnum;
}
if(!SilentMode) {
if(!silentMode) {
if (!index) cprintf(BLUE,"%d First Acquisition Index:%lu\n"
"%d First Measurement Index:%lu\n",
index, firstAcquisitionIndex,
@ -252,6 +220,10 @@ void Listener::ShutDownUDPSocket() {
}
void Listener::SetSilentMode(bool mode) {
silentMode = mode;
}
void Listener::ThreadExecution() {
char* buffer;
@ -304,7 +276,7 @@ void Listener::ThreadExecution() {
fifo->PushAddress(buffer);
//Statistics
if(!SilentMode) {
if(!silentMode) {
numFramesStatistic++;
if (numFramesStatistic >= generalData->maxFramesPerFile)
PrintFifoStatistics();

View File

@ -216,26 +216,31 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
dataStreamer.clear();
if (enable) {
bool error = false;
for ( int i = 0; i < numThreads; ++i ) {
dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &shortFrameEnable, &fileIndex, flippedData, additionalJsonHeader));
dataStreamer[i]->SetGeneralData(generalData);
if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP) == FAIL) {
error = true;
break;
int ret = FAIL;
DataStreamer* s = new DataStreamer(ret, i, fifo[i], &dynamicRange, &shortFrameEnable, &fileIndex, flippedData, additionalJsonHeader);
if (ret == FAIL)
cprintf(RED,"Error: Could not create data callback threads\n");
else {
dataStreamer.push_back(s);
dataStreamer[i]->SetGeneralData(generalData);
if (dataStreamer[i]->CreateZmqSockets(&numThreads, streamingPort, streamingSrcIP) == FAIL) {
cprintf(RED,"Error: Could not create zmq sockets\n");
ret = FAIL;
}
}
// error in creating threads or zmq sockets
if (ret == FAIL) {
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
return FAIL;
}
}
if (DataStreamer::GetErrorMask() || error) {
if (DataStreamer::GetErrorMask())
cprintf(RED,"Error: Could not create data callback threads\n");
else
cprintf(RED,"Error: Could not create zmq sockets\n");
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
return FAIL;
}
SetThreadPriorities();
}
}
@ -311,9 +316,12 @@ int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
void UDPStandardImplementation::setSilentMode(const uint32_t i){
silentMode = i;
Listener::SetSilentMode(i);
DataProcessor::SetSilentMode(i);
DataStreamer::SetSilentMode(i);
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetSilentMode(i);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetSilentMode(i);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetSilentMode(i);
FILE_LOG(logINFO) << "Silent Mode: " << i;
}
@ -363,12 +371,18 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
//create threads
for ( int i=0; i < numThreads; ++i ) {
listener.push_back(new Listener(myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange));
dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable, &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady));
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
for ( int i = 0; i < numThreads; ++i ) {
int ret = FAIL;
Listener* l = new Listener(ret, i, myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange);
DataProcessor* p = NULL;
if (ret == OK)
p = new DataProcessor(ret, i, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable, &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
// error in creating threads
if (ret == FAIL) {
FILE_LOG(logERROR) << "Could not create listener/dataprocessor threads (index:" << i << ")";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
@ -378,6 +392,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
dataProcessor.clear();
return FAIL;
}
listener.push_back(l);
dataProcessor.push_back(p);
}
//set up writer and callbacks
@ -474,14 +491,20 @@ void UDPStandardImplementation::stopReceiver(){
//set status to transmitting
startReadout();
//wait for the processes to be done
while(Listener::GetRunningMask()){
usleep(5000);
}
while(DataProcessor::GetRunningMask()){
usleep(5000);
//wait for the processes (Listener and DataProcessor) to be done
bool running = true;
while(running) {
running = false;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
if ((*it)->IsRunning())
running = true;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
}
//create virtual file
if (fileWriteEnable && fileFormatType == HDF5) {
uint64_t maxIndexCaught = 0;
@ -495,9 +518,15 @@ void UDPStandardImplementation::stopReceiver(){
dataProcessor[0]->EndofAcquisition(maxIndexCaught); //to create virtual file
}
while(DataStreamer::GetRunningMask()){
usleep(5000);
}
//wait for the processes (DataStreamer) to be done
running = true;
while(running) {
running = false;
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
if ((*it)->IsRunning())
running = true;
usleep(5000);
}
status = RUN_FINISHED;
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
@ -603,7 +632,7 @@ void UDPStandardImplementation::closeFiles() {
int UDPStandardImplementation::restreamStop() {
bool ret = OK;
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it) {
if ((*it)->restreamStop() == FAIL)
if ((*it)->RestreamStop() == FAIL)
ret = FAIL;
}
@ -667,11 +696,13 @@ int UDPStandardImplementation::SetupFifoStructure() {
delete(*it);
fifo.clear();
for ( int i = 0; i < numThreads; i++ ) {
//create fifo structure
bool success = true;
fifo.push_back( new Fifo (
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
fifoDepth, success));
Fifo* f = new Fifo (i,
(generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
fifoDepth, success);
//error
if (!success) {
cprintf(RED,"Error: Could not allocate memory for fifo structure of index %d\n", i);
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
@ -679,6 +710,8 @@ int UDPStandardImplementation::SetupFifoStructure() {
fifo.clear();
return FAIL;
}
fifo.push_back(f);
//set the listener & dataprocessor threads to point to the right fifo
if(listener.size())listener[i]->SetFifo(fifo[i]);
if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]);
@ -693,10 +726,6 @@ int UDPStandardImplementation::SetupFifoStructure() {
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
Listener::ResetRunningMask();
DataProcessor::ResetRunningMask();
DataStreamer::ResetRunningMask();
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)