fnum wrong, need to add file features

This commit is contained in:
Dhanya Maliakal
2017-02-07 08:52:27 +01:00
parent d95aaa2089
commit c89f6e649c
13 changed files with 364 additions and 94 deletions

View File

@@ -7,7 +7,12 @@
#include "DataProcessor.h"
#include "GeneralData.h"
#include "Fifo.h"
#include "BinaryFileWriter.h"
#ifdef HDF5C
#include "HDF5FileWriter.h"
#endif
#include <iostream>
#include <cstring>
@@ -23,14 +28,18 @@ uint64_t DataProcessor::RunningMask(0x0);
pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER;
const GeneralData* DataProcessor::generalData(0);
bool DataProcessor::acquisitionStartedFlag(false);
bool DataProcessor::measurementStartedFlag(false);
DataProcessor::DataProcessor(Fifo*& f) :
DataProcessor::DataProcessor(Fifo*& f, runStatus* s, pthread_mutex_t* m) :
ThreadObject(NumberofDataProcessors),
fifo(f),
status(s),
statusMutex(m),
numTotalFramesCaught(0),
numFramesCaught(0),
firstAcquisitionIndex(0),
@@ -51,8 +60,14 @@ DataProcessor::DataProcessor(Fifo*& f) :
DataProcessor::~DataProcessor() {
FILE_LOG (logDEBUG) << __AT__ << " called";
for (vector<FileWriter*>::const_iterator it = fileWriter.begin(); it != fileWriter.end(); ++it)
delete(*it);
fileWriter.clear();
ThreadObject::DestroyThread();
NumberofDataProcessors--;
}
/** static functions */
@@ -76,6 +91,15 @@ bool DataProcessor::GetMeasurementStartedFlag(){
}
void DataProcessor::SetGeneralData(GeneralData*& g) {
FILE_LOG (logDEBUG) << __AT__ << " called";
generalData = g;
#ifdef VERY_VERBOSE
generalData->Print();
#endif
}
/** non static functions */
string DataProcessor::GetType(){
@@ -163,25 +187,42 @@ void DataProcessor::ThreadExecution() {
char* buffer=0;
fifo->PopAddress(buffer);
#ifdef FIFODEBUG
cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
if (!index) cprintf(BLUE,"DataProcessor %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
#endif
uint32_t numPackets = (uint32_t)(*((uint32_t*)buffer));
if(numPackets == DUMMY_PACKET_VALUE){
cprintf(GREEN,"DataProcessing %d: Got dummy value*****\n");
StopRunning();
fifo->FreeAddress(buffer);
if (numPackets == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
uint64_t fnum; uint32_t pnum; uint32_t snum; uint64_t bcid;
GetHeaderInfo(index,buffer+generalData->fifoBufferHeaderSize,16,fnum,pnum,snum,bcid);
cprintf(GREEN,"DataProcessing %d: fnum:%lld, pnum:%d\n",(long long int)fnum, pnum);
generalData->GetHeaderInfo(index,buffer+generalData->fifoBufferHeaderSize,16,fnum,pnum,snum,bcid);
if (!index) cprintf(BLUE,"DataProcessing %d: fnum:%lld, pnum:%d\n", index, (long long int)fnum, pnum);
fifo->FreeAddress(buffer);
}
void DataProcessor::StopProcessing(char* buf) {
FILE_LOG (logDEBUG) << __AT__ << " called";
cprintf(BLUE,"%d: End of Processing\n", index);
fifo->FreeAddress(buf);
StopRunning();
if (!index) {
while (RunningMask)
usleep(5000);
pthread_mutex_lock(statusMutex);
*status = RUN_FINISHED;
pthread_mutex_unlock((statusMutex));
FILE_LOG(logINFO) << "Status: " << runStatusType(*status);
}
}
int DataProcessor::CreateNewFile() {
FILE_LOG (logDEBUG) << __AT__ << " called";
//create file fileWriter.push_back(new BinaryFileWriter(fileName))

View File

@@ -26,6 +26,7 @@ Fifo::Fifo(uint32_t fifoItemSize, uint32_t fifoDepth, bool &success):
Fifo::~Fifo() {
FILE_LOG (logDEBUG) << __AT__ << " called";
cprintf(RED,"destroying fifos\n");
DestroyFifos();
NumberofFifoClassObjects--;
}

View File

@@ -7,6 +7,7 @@
#include "Listener.h"
#include "GeneralData.h"
#include "Fifo.h"
#include "genericSocket.h"
@@ -24,13 +25,18 @@ uint64_t Listener::RunningMask(0x0);
pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER;
const GeneralData* Listener::generalData(0);
bool Listener::acquisitionStartedFlag(false);
bool Listener::measurementStartedFlag(false);
Listener::Listener(Fifo*& f) :
Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno) :
ThreadObject(NumberofListeners),
fifo(f),
status(s),
udpSocket(0),
udpPortNumber(portno),
numTotalPacketsCaught(0),
numPacketsCaught(0),
firstAcquisitionIndex(0),
@@ -74,6 +80,14 @@ bool Listener::GetMeasurementStartedFlag(){
return measurementStartedFlag;
}
void Listener::SetGeneralData(GeneralData*& g) {
FILE_LOG (logDEBUG) << __AT__ << " called";
generalData = g;
//#ifdef VERY_VERBOSE
generalData->Print();
//#endif
}
/** non static functions */
@@ -157,45 +171,90 @@ void Listener::ThreadExecution() {
char* buffer;
fifo->GetNewAddress(buffer);
#ifdef FIFODEBUG
cprintf(GREEN,"Listener %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
if (!index) cprintf(GREEN,"Listener %d, pop 0x%p buffer:%s\n", index,(void*)(buffer),buffer);
#endif
int rc;
int rc = 0;
while ((rc>0 && rc < generalData->packetSize)) {
rc = udpSocket->ReceiveDataOnly(buffer + generalData->fifoBufferHeaderSize,fifoBufferSize);
cprintf(BLUE,"Listening %d: rc: %d\n",index,rc);
uint64_t fnum; uint32_t pnum; uint32_t snum; uint64_t bcid;
GetHeaderInfo(index,buffer,16,fnum,pnum,snum,bcid);
cprintf(BLUE,"Listening %d: fnum:%lld, pnum:%d\n",(long long int)fnum, pnum);
*((uint32_t*)(buffer[ithread])) = (rc/generalData->packetSize);
//udpsocket doesnt exist
if (!udpSocket) {
FILE_LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not created or shut down earlier";
(*((uint32_t*)buffer)) = 0;
StopListening(buffer);
return;
}
if(rc <=0 ){
cprintf(BLUE,"Listening %d: Gonna send dummy value*****\n");
(*((uint32_t*)buffer)) = DUMMY_PACKET_VALUE;
StopRunning();
//get data
if (*status != TRANSMITTING) {
rc = udpSocket->ReceiveDataOnly(buffer + generalData->fifoBufferHeaderSize,generalData->fifoBufferSize);
if (!index) cprintf(GREEN,"Listening %d: rc: %d\n",index,rc);
(*((uint32_t*)buffer)) = ((rc <= 0) ? 0 : rc);
}
//done acquiring
if (*status == TRANSMITTING) {
StopListening(buffer);
return;
}
uint64_t fnum; uint32_t pnum; uint32_t snum; uint64_t bcid=0;
generalData->GetHeaderInfo(index,buffer + generalData->fifoBufferHeaderSize,16,fnum,pnum,snum,bcid);
if (!index) cprintf(GREEN,"Listening %d: fnum:%lld, pnum:%d\n", index, (long long int)fnum, pnum);
//push into fifo
fifo->PushAddress(buffer);
}
int Listener::CreateUDPSockets(uint32_t portnumber, uint32_t packetSize, const char* eth, uint32_t headerPacketSize) {
void Listener::StopListening(char* buf) {
FILE_LOG (logDEBUG) << __AT__ << " called";
udpSocket = new genericSocket(portnumber, genericSocket::UDP, packetSize, eth, headerPacketSize);
cprintf(BLUE,"%d: End of Listening\n", index);
uint32_t numPackets = (uint32_t)(*((uint32_t*)buf));
//incomplete packets
if (numPackets > 0) {
fifo->PushAddress(buf);
fifo->GetNewAddress(buf);
#ifdef FIFODEBUG
if (!index) cprintf(GREEN,"Listener %d, Got incomplete, for dummy: pop 0x%p buffer:%s\n", index,(void*)(buf),buf);
#endif
}
//dummy
(*((uint32_t*)buf)) = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf);
StopRunning();
//#ifdef DEBUG4
if (!index) FILE_LOG(logINFO) << "Listening Thread (" << *udpPortNumber << ")"
" Packets caught: " << numPacketsCaught;
//#endif
}
int Listener::CreateUDPSockets(const char* eth) {
FILE_LOG (logDEBUG) << __AT__ << " called";
udpSocket = new genericSocket(*udpPortNumber, genericSocket::UDP,
generalData->packetSize, eth, generalData->headerPacketSize);
int iret = udpSocket->getErrorStatus();
if(!iret){
cout << "UDP port opened at port " << portnumber << endl;
cout << "UDP port opened at port " << *udpPortNumber << endl;
}else{
FILE_LOG(logERROR) << "Could not create UDP socket on port " << portnumber << " error: " << iret;
FILE_LOG(logERROR) << "Could not create UDP socket on port " << *udpPortNumber << " error: " << iret;
return FAIL;
}
return OK;
}
void Listener::ShutDownUDPSocket() {
FILE_LOG (logDEBUG) << __AT__ << " called";

View File

@@ -10,10 +10,6 @@
#include "DataProcessor.h"
#include "DataStreamer.h"
#include "Fifo.h"
#include "BinaryFileWriter.h"
#ifdef HDF5C
#include "HDF5FileWriter.h"
#endif
#include <cstdlib> //system
#include <cstring> //strcpy
@@ -39,11 +35,18 @@ void UDPStandardImplementation::DeleteMembers() {
FILE_LOG (logDEBUG) << __AT__ << " starting";
if (generalData) { delete generalData; generalData=0;}
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
fileWriter.clear();
}
@@ -60,13 +63,11 @@ void UDPStandardImplementation::InitializeMembers() {
numThreads = 1;
numberofJobs = 1;
//*** mutex ***
pthread_mutex_init(&statusMutex,NULL);
//** class objects ***
generalData = 0;
listener.clear();
dataProcessor.clear();
dataStreamer.clear();
fifo.clear();
fileWriter.clear();
}
@@ -142,6 +143,9 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
numberofJobs = -1; //changes to imagesize has to be noted to recreate fifo structure
if (SetupFifoStructure() == FAIL)
return FAIL;
Listener::SetGeneralData(generalData);
DataProcessor::SetGeneralData(generalData);
}
FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable;
return OK;
@@ -177,14 +181,18 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
dataStreamEnable = enable;
//data sockets have to be created again as the client ones are
if (dataStreamer.size())
dataStreamer.clear();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
if (enable) {
for ( int i=0; i < numThreads; ++i ) {
dataStreamer.push_back(new DataStreamer());
if (DataStreamer::GetErrorMask()) {
cprintf(BG_RED,"Error: Could not create data callback threads\n");
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
delete(*it);
dataStreamer.clear();
return FAIL;
}
}
@@ -316,7 +324,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
FILE_LOG (logDEBUG) << "Setting receiver type";
DeleteMembers();
DeleteMembers();cout<<"size of fifo:"<<fifo.size()<<endl;
InitializeMembers();
myDetectorType = d;
switch(myDetectorType) {
@@ -344,23 +352,30 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
case JUNGFRAU: generalData = new JungfrauData(); break;
default: break;
}
Listener::SetGeneralData(generalData);
DataProcessor::SetGeneralData(generalData);
numThreads = generalData->threadsPerReceiver;
fifoDepth = generalData->defaultFifoDepth;
//create fifo structure
numberofJobs = -1;
if (SetupFifoStructure() == FAIL) {
FILE_LOG (logERROR) << "Error: Could not allocate memory for fifo structure";
return FAIL;
}
//create threads
for ( int i=0; i < numThreads; ++i ) {
//create fifo structure
numberofJobs = -1;
if (SetupFifoStructure() == FAIL) {
FILE_LOG (logERROR) << "Error: Could not allocate memory for fifo (index:" << i << ")";
return FAIL;
}
//create threads
listener.push_back(new Listener(fifo[i]));
dataProcessor.push_back(new DataProcessor(fifo[i]));
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i]));
dataProcessor.push_back(new DataProcessor(fifo[i], &status, &statusMutex));
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
delete(*it);
listener.clear();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
delete(*it);
dataProcessor.clear();
return FAIL;
}
}
@@ -407,6 +422,11 @@ int UDPStandardImplementation::startReceiver(char *c) {
}
}
//change status
pthread_mutex_lock(&statusMutex);
status = RUNNING;
pthread_mutex_unlock(&(statusMutex));
//Let Threads continue to be ready for acquisition
StartRunning();
@@ -428,11 +448,11 @@ void UDPStandardImplementation::stopReceiver(){
usleep(5000);
}
/* //change status
//change status
pthread_mutex_lock(&statusMutex);
status = IDLE;
pthread_mutex_unlock(&(statusMutex));
*/
FILE_LOG(logINFO) << "Receiver Stopped";
FILE_LOG(logINFO) << "Status: " << runStatusType(status);
cout << endl << endl;
@@ -489,10 +509,10 @@ void UDPStandardImplementation::startReadout(){
}
}
/*//set status
//set status
pthread_mutex_lock(&statusMutex);
status = TRANSMITTING;
pthread_mutex_unlock(&statusMutex);*/
pthread_mutex_unlock(&statusMutex);
FILE_LOG(logINFO) << "Status: Transmitting";
}
@@ -518,7 +538,6 @@ void UDPStandardImplementation::closeFiles() {
void UDPStandardImplementation::SetLocalNetworkParameters() {
FILE_LOG (logDEBUG) << __AT__ << " called";
@@ -553,7 +572,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
//recalculate number of jobs & fifodepth, return if no change
if ((myDetectorType == GOTTHARD) || (myDetectorType = PROPIX)) {
if ((myDetectorType == GOTTHARD) || (myDetectorType == PROPIX)) {
int oldnumberofjobs = numberofJobs;
//listen to only n jobs at a time
@@ -585,24 +604,27 @@ int UDPStandardImplementation::SetupFifoStructure() {
numberofJobs = 1;
//create fifostructure
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
for ( int i=0; i < numThreads; i++ ) {
for ( int i = 0; i < numThreads; i++ ) {
//create fifo structure
bool success = true;
fifo.push_back( new Fifo (
(generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize),
fifoDepth, success));
if (!success) {
cprintf(BG_RED,"Error: Could not allocate memory for listening \n");
cprintf(BG_RED,"Error: Could not allocate memory for fifo structure of index %d\n", i);
for (vector<Fifo*>::const_iterator it = fifo.begin(); it != fifo.end(); ++it)
delete(*it);
fifo.clear();
return FAIL;
}
//set the listener & dataprocessor threads to point to the right fifo
listener[i]->SetFifo(fifo[i]);
dataProcessor[i]->SetFifo(fifo[i]);
if(listener.size())listener[i]->SetFifo(fifo[i]);
if(dataProcessor.size())dataProcessor[i]->SetFifo(fifo[i]);
}
FILE_LOG (logINFO) << "Fifo structure(s) reconstructed";
return OK;
}
@@ -633,8 +655,7 @@ int UDPStandardImplementation::CreateUDPSockets() {
}
bool error = false;
for (unsigned int i = 0; i < listener.size(); ++i)
if (listener[i]->CreateUDPSockets(udpPortNum[i], generalData->packetSize,
(strlen(eth)?eth:NULL), generalData->headerPacketSize) == FAIL) {
if (listener[i]->CreateUDPSockets((strlen(eth)?eth:NULL)) == FAIL) {
error = true;
break;
}

View File

@@ -101,7 +101,7 @@ int main(int argc, char *argv[]) {
//receiver->registerCallBackRawDataReady(rawDataReadyCallBack,NULL);
/*
//start tcp server thread
if(receiver->start() == slsReceiverDefs::OK){
FILE_LOG(logDEBUG1) << "DONE!" << endl;
@@ -113,7 +113,7 @@ int main(int argc, char *argv[]) {
//stop tcp server thread, stop udp socket
receiver->stop();
}
*/
deleteReceiver(receiver);
cout << "Goodbye!" << endl;
return 0;

View File

@@ -127,7 +127,6 @@ slsReceiver::slsReceiver(int argc, char *argv[], int &success){
udp_interface = UDPInterface::create(udp_interface_type);
udp_interface->configure(configuration_map);
#endif
udp_interface = UDPInterface::create("standard");
tcpipInterface = new slsReceiverTCPIPInterface(success, udp_interface, tcpip_port_no);
}
}

View File

@@ -23,8 +23,8 @@ using namespace std;
slsReceiverTCPIPInterface::~slsReceiverTCPIPInterface() {
/*stop();
if(mySock) {delete mySock; mySock=NULL;}*/
stop();
if(mySock) {delete mySock; mySock=NULL;}
}
slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface* rbase, int pn):
@@ -55,7 +55,7 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface*
success=OK;
/*//create socket
//create socket
if(success == OK){
mySock = new MySocketTCP(port_no);
if (mySock->getErrorStatus()) {
@@ -73,7 +73,7 @@ slsReceiverTCPIPInterface::slsReceiverTCPIPInterface(int &success, UDPInterface*
cout << "Function table assigned." << endl;
#endif
}
}*/
}
}