in the process of streamer

This commit is contained in:
Dhanya Maliakal
2017-02-27 15:38:46 +01:00
parent 936dfea8a7
commit 3b07afe3fc
16 changed files with 654 additions and 171 deletions

View File

@ -13,27 +13,24 @@
#include <cstdlib> //system
#include <cstring> //strcpy
#include <errno.h> //eperm
using namespace std;
/** cosntructor & destructor */
UDPStandardImplementation::UDPStandardImplementation() {
InitializeMembers();
}
UDPStandardImplementation::~UDPStandardImplementation() {
DeleteMembers();
}
void UDPStandardImplementation::DeleteMembers() {
if (generalData) { delete generalData; generalData=0;}
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
@ -51,8 +48,6 @@ void UDPStandardImplementation::DeleteMembers() {
void UDPStandardImplementation::InitializeMembers() {
UDPBaseImplementation::initializeMembers();
acquisitionPeriod = SAMPLE_TIME_IN_NS;
@ -73,8 +68,6 @@ void UDPStandardImplementation::InitializeMembers() {
/*** Overloaded Functions called by TCP Interface ***/
uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
@ -91,8 +84,6 @@ uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
}
uint64_t UDPStandardImplementation::getFramesCaught() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
@ -108,8 +99,6 @@ uint64_t UDPStandardImplementation::getFramesCaught() const {
}
int64_t UDPStandardImplementation::getAcquisitionIndex() const {
uint64_t sum = 0;
uint32_t flagsum = 0;
@ -126,8 +115,6 @@ int64_t UDPStandardImplementation::getAcquisitionIndex() const {
void UDPStandardImplementation::setFileFormat(const fileFormat f){
FILE_LOG(logDEBUG) << __AT__ << " starting";
switch(f){
#ifdef HDF5C
case HDF5:
@ -147,8 +134,6 @@ void UDPStandardImplementation::setFileFormat(const fileFormat f){
void UDPStandardImplementation::setFileName(const char c[]) {
if (strlen(c)) {
strcpy(fileName, c); //automatically update fileName in Filewriter (pointer)
/*int detindex = -1;
@ -169,8 +154,6 @@ void UDPStandardImplementation::setFileName(const char c[]) {
int UDPStandardImplementation::setShortFrameEnable(const int i) {
if (myDetectorType != GOTTHARD) {
cprintf(RED, "Error: Can not set short frame for this detector\n");
return FAIL;
@ -193,6 +176,8 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->SetGeneralData(generalData);
}
FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable;
return OK;
@ -200,8 +185,6 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
if (frameToGuiFrequency != freq) {
frameToGuiFrequency = freq;
@ -222,8 +205,6 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
if (dataStreamEnable != enable) {
dataStreamEnable = enable;
@ -233,16 +214,27 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
dataStreamer.clear();
if (enable) {
for ( int i=0; i < numThreads; ++i ) {
dataStreamer.push_back(new DataStreamer());
if (DataStreamer::GetErrorMask()) {
cprintf(BG_RED,"Error: Could not create data callback threads\n");
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
return FAIL;
bool error = false;
for ( int i = 0; i < numThreads; ++i ) {
dataStreamer.push_back(new DataStreamer(fifo[i], &frameToGuiFrequency, &frameToGuiTimerinMS, &dynamicRange));
dataStreamer[i]->SetGeneralData(generalData);
if (dataStreamer[i]->CreateZmqSockets() == FAIL) {
error = true;
break;
}
}
if (DataStreamer::GetErrorMask() || error) {
if (DataStreamer::GetErrorMask())
cprintf(BG_RED,"Error: Could not create data callback threads\n");
else
cprintf(BG_RED,"Error: Could not create zmq sockets\n");
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
dataStreamEnable = false;
return FAIL;
}
SetThreadPriorities();
}
}
FILE_LOG (logINFO) << "Data Send to Gui: " << dataStreamEnable;
@ -251,8 +243,6 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) {
if (acquisitionPeriod != i) {
acquisitionPeriod = i;
@ -273,8 +263,6 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) {
int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) {
if (acquisitionTime != i) {
acquisitionTime = i;
@ -295,8 +283,6 @@ int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) {
int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) {
if (numberOfFrames != i) {
numberOfFrames = i;
@ -317,8 +303,6 @@ int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) {
int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
if (dynamicRange != i) {
dynamicRange = i;
@ -335,8 +319,6 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
int UDPStandardImplementation::setTenGigaEnable(const bool b) {
if (tengigaEnable != b) {
tengigaEnable = b;
//side effects
@ -352,8 +334,6 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b) {
int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
if (fifoDepth != i) {
fifoDepth = i;
@ -368,8 +348,6 @@ int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
int UDPStandardImplementation::setDetectorType(const detectorType d) {
FILE_LOG (logDEBUG) << "Setting receiver type";
DeleteMembers();
@ -415,8 +393,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
//create threads
for ( int i=0; i < numThreads; ++i ) {
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth));
dataProcessor.push_back(new DataProcessor(fifo[i], &status, &statusMutex, &fileFormatType, &fileWriteEnable,
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames));
dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable,
&callbackAction, rawDataReadyCallBack,pRawDataReady));
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")";
@ -436,6 +414,9 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->SetGeneralData(generalData);
}
SetThreadPriorities();
FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d);
return OK;
}
@ -444,7 +425,6 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
void UDPStandardImplementation::setDetectorPositionId(const int i){
FILE_LOG(logDEBUG) << __AT__ << " starting";
detID = i;
FILE_LOG(logINFO) << "Detector Position Id:" << detID;
@ -456,17 +436,19 @@ void UDPStandardImplementation::setDetectorPositionId(const int i){
void UDPStandardImplementation::resetAcquisitionCount() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
FILE_LOG (logINFO) << "Acquisition Count has been reset";
}
int UDPStandardImplementation::VerifyCallBackAction() {
/** file path and file index not required?? or need to include detector index? do they need the datasize? its given for write data anyway */
@ -496,7 +478,6 @@ int UDPStandardImplementation::VerifyCallBackAction() {
}
int UDPStandardImplementation::startReceiver(char *c) {
ResetParametersforNewMeasurement();
//listener
@ -541,7 +522,6 @@ int UDPStandardImplementation::startReceiver(char *c) {
void UDPStandardImplementation::stopReceiver(){
FILE_LOG(logDEBUG) << __AT__ << " called";
FILE_LOG(logINFO) << "Stopping Receiver";
//set status to transmitting
@ -554,7 +534,9 @@ void UDPStandardImplementation::stopReceiver(){
while(DataProcessor::GetRunningMask()){
usleep(5000);
}
while(DataStreamer::GetRunningMask()){
usleep(5000);
}
pthread_mutex_lock(&statusMutex);
status = RUN_FINISHED;
@ -579,6 +561,8 @@ void UDPStandardImplementation::stopReceiver(){
cprintf(GREEN, "Last Frame Number Caught :%lld\n",(long long int)listener[i]->GetLastFrameIndexCaught());
}
}
if(!activated)
cprintf(RED,"Note: Deactivated Receiver\n");
//callback
if (acquisitionFinishedCallBack)
acquisitionFinishedCallBack((int)(tot/numThreads), pAcquisitionFinished);
@ -597,8 +581,6 @@ void UDPStandardImplementation::stopReceiver(){
void UDPStandardImplementation::startReadout(){
FILE_LOG(logDEBUG) << __AT__ << " called";
if(status == RUNNING){
//needs to wait for packets only if activated
@ -647,7 +629,6 @@ void UDPStandardImplementation::startReadout(){
void UDPStandardImplementation::shutDownUDPSockets() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ShutDownUDPSocket();
}
@ -655,7 +636,6 @@ void UDPStandardImplementation::shutDownUDPSockets() {
void UDPStandardImplementation::closeFiles() {
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->CloseFiles();
}
@ -663,8 +643,6 @@ void UDPStandardImplementation::closeFiles() {
void UDPStandardImplementation::SetLocalNetworkParameters() {
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if (myDetectorType == EIGER)
return;
@ -691,10 +669,46 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
void UDPStandardImplementation::SetThreadPriorities() {
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it){
if ((*it)->SetThreadPriority(LISTENER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option.";
return;
}
}
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it){
if ((*it)->SetThreadPriority(PROCESSOR_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option.";
return;
}
}
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
if ((*it)->SetThreadPriority(STREAMER_PRIORITY) == FAIL) {
FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option.";
return;
}
}
struct sched_param tcp_param;
tcp_param.sched_priority = TCP_PRIORITY;
if (pthread_setschedparam(pthread_self(),5 , &tcp_param) != EPERM) {
FILE_LOG(logWARNING) << "Unable to prioritize threads. Root privileges required for this option.";
return;
}
ostringstream osfn;
osfn << "Priorities set - "
"TCP:"<< TCP_PRIORITY <<
", Listener:" << LISTENER_PRIORITY <<
", Processor:" << PROCESSOR_PRIORITY;
if (dataStreamEnable)
osfn << ", Streamer:" << STREAMER_PRIORITY;
FILE_LOG(logINFO) << osfn.str();
}
int UDPStandardImplementation::SetupFifoStructure() {
//recalculate number of jobs & fifodepth, return if no change
if ((myDetectorType == GOTTHARD) || (myDetectorType == PROPIX)) {
@ -747,6 +761,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
//set the listener & dataprocessor threads to point to the right fifo
if(listener.size())listener[i]->SetFifo(fifo[i]);
if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]);
if(dataStreamer.size())dataStreamer[i]->SetFifo(fifo[i]);
}
FILE_LOG (logINFO) << "Fifo structure(s) reconstructed";
@ -756,18 +771,21 @@ int UDPStandardImplementation::SetupFifoStructure() {
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
Listener::ResetRunningMask();
DataProcessor::ResetRunningMask();
DataStreamer::ResetRunningMask();
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
}
int UDPStandardImplementation::CreateUDPSockets() {
bool error = false;
for (unsigned int i = 0; i < listener.size(); ++i)
if (listener[i]->CreateUDPSockets() == FAIL) {
@ -785,7 +803,6 @@ int UDPStandardImplementation::CreateUDPSockets() {
int UDPStandardImplementation::SetupWriter() {
bool error = false;
for (unsigned int i = 0; i < dataProcessor.size(); ++i)
if (dataProcessor[i]->CreateNewFile(tengigaEnable,
@ -805,8 +822,6 @@ int UDPStandardImplementation::SetupWriter() {
void UDPStandardImplementation::StartRunning() {
//set running mask and post semaphore to start the inner loop in execution thread
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {
(*it)->StartRunning();
@ -816,4 +831,8 @@ void UDPStandardImplementation::StartRunning() {
(*it)->StartRunning();
(*it)->Continue();
}
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it){
(*it)->StartRunning();
(*it)->Continue();
}
}