master hdf5 does not work, but so far good

This commit is contained in:
Dhanya Maliakal
2017-02-16 12:48:15 +01:00
parent 54f92fb26c
commit 2cd38fefcd
14 changed files with 958 additions and 225 deletions

View File

@ -5,23 +5,28 @@
***********************************************/
#include "BinaryFile.h"
#include "receiver_defs.h"
#include <iostream>
#include <iomanip>
#include <string.h>
using namespace std;
FILE* BinaryFile::masterfd = 0;
BinaryFile::BinaryFile(int ind, char* fname, char* fpath, uint64_t* findex,
bool* frindexenable, bool* owenable, int* dindex, int* nunits, uint32_t maxf):
File(ind, fname, fpath, findex, frindexenable, owenable, dindex, nunits),
maxFramesPerFile(maxf)
bool* frindexenable, bool* owenable, int* dindex, int* nunits, uint64_t* nf, uint32_t* dr, uint32_t maxf):
File(ind, fname, fpath, findex, frindexenable, owenable, dindex, nunits, nf, dr),
maxFramesPerFile(maxf),
filefd(0)
{
printf("%d BinaryFile constructor\n",index);
#ifdef VERBOSE
PrintMembers();
#endif
}
BinaryFile::~BinaryFile() {
printf("%d BinaryFile destructor\n",index);
}
void BinaryFile::PrintMembers() {
@ -29,7 +34,7 @@ void BinaryFile::PrintMembers() {
printf("Max Frames Per File: %d\n",maxFramesPerFile);
}
slsReceiverDefs::fileFormat BinaryFile::GetType() {
slsReceiverDefs::fileFormat BinaryFile::GetFileType() {
return BINARY;
}
@ -41,15 +46,45 @@ int BinaryFile::CreateFile(uint64_t fnum) {
currentFileName = CreateFileName(filePath, fileNamePrefix, *fileIndex,
*frameIndexEnable, fnum, *detIndex, *numUnitsPerDetector, index);
printf("%d Binary File: %s\n", index, currentFileName.c_str());
if (CreateDataFile(filefd, *overWriteEnable, currentFileName) == FAIL)
return FAIL;
printf("%d Binary File created: %s\n", index, currentFileName.c_str());
return OK;
}
void BinaryFile::CloseFile() {
printf("%d Closing File: %s\n", index, currentFileName.c_str());
void BinaryFile::CloseCurrentFile() {
CloseDataFile(filefd);
}
void BinaryFile::CloseAllFiles() {
CloseDataFile(filefd);
if (master)
CloseCommonDataFiles();
}
int BinaryFile::WriteToFile(char* buffer, int buffersize, uint64_t fnum) {
if (WriteDataFile(filefd, buffer, buffersize, fnum) == buffersize)
return OK;
cprintf(RED,"%d Error: Write to file failed for image number %lld\n", index, (long long int)fnum);
return FAIL;
}
int BinaryFile::CreateCommonFiles(bool en, uint32_t size,
uint32_t nx, uint32_t ny, uint64_t at, uint64_t ap) {
if (master) {
string masterFileName="";
CreateCommonFileNames(masterFileName, filePath, fileNamePrefix, *fileIndex);
printf("Master HDF5 File: %s\n", masterFileName.c_str());
//create common files
return CreateCommonDataFiles(masterFileName, *overWriteEnable,
en, size, nx, ny, at, ap);
}
return OK;
}
/*** static function ***/
string BinaryFile::CreateFileName(char* fpath, char* fnameprefix, uint64_t findex,
bool frindexenable, uint64_t fnum, int dindex, int numunits, int unitindex) {
ostringstream osfn;
@ -61,10 +96,100 @@ string BinaryFile::CreateFileName(char* fpath, char* fnameprefix, uint64_t finde
return osfn.str();
}
int BinaryFile::CreateDataFile(bool owenable, char* fname) {
/*** static function ***/
int BinaryFile::CreateDataFile(FILE*& fd, bool owenable, string fname) {
if(!owenable){
if (NULL == (fd = fopen((const char *) fname.c_str(), "wx"))){
FILE_LOG(logERROR) << "Could not create/overwrite file" << fname;
fd = 0;
return FAIL;
}
}else if (NULL == (fd = fopen((const char *) fname.c_str(), "w"))){
FILE_LOG(logERROR) << "Could not create file" << fname;
fd = 0;
return FAIL;
}
//setting file buffer size to 16mb
setvbuf(fd,NULL,_IOFBF,FILE_BUFFER_SIZE);
return OK;
}
void BinaryFile::CloseDataFile() {
/*** static function ***/
void BinaryFile::CloseDataFile(FILE*& fd) {
if (fd)
fclose(fd);
fd = 0;
}
/*** static function ***/
int BinaryFile::WriteDataFile(FILE* fd, char* buf, int bsize, uint64_t fnum) {
if (!fd)
return 0;
return fwrite(buf, 1, bsize, fd);
}
void BinaryFile::CreateCommonFileNames(string& m, char* fpath, char* fnameprefix, uint64_t findex) {
ostringstream osfn;
osfn << fpath << "/" << fnameprefix;
osfn << "_master";
osfn << "_" << findex;
osfn << ".raw";
m = osfn.str();
}
void BinaryFile::CloseCommonDataFiles() {
if(masterfd)
delete masterfd;
masterfd = 0;
}
int BinaryFile::CreateCommonDataFiles(string m, bool owenable,
bool tengigaEnable, uint32_t imageSize, uint32_t nPixelsX, uint32_t nPixelsY,
uint64_t acquisitionTime, uint64_t acquisitionPeriod) {
if(!owenable){
if (NULL == (masterfd = fopen((const char *) m.c_str(), "wx"))){
cprintf(RED,"Error in creating binary master file %s\n",m.c_str());
masterfd = 0;
return FAIL;
}
}else if (NULL == (masterfd = fopen((const char *) m.c_str(), "w"))){
cprintf(RED,"Error in creating binary master file %s\n",m.c_str());
masterfd = 0;
return FAIL;
}
time_t t = time(0);
char message[MAX_STR_LENGTH];
sprintf(message,
"Version\t\t: %.1f\n"
"Dynamic Range\t: %d\n"
"Ten Giga\t: %d\n"
"Image Size\t: %d bytes\n"
"x\t\t: %d pixels\n"
"y\t\t: %d pixels\n"
"Total Frames\t: %lld\n"
"Exptime (ns)\t: %lld\n"
"Period (ns)\t: %lld\n"
"Timestamp\t: %s\n\n",
BINARY_WRITER_VERSION,
*dynamicRange,
tengigaEnable,
imageSize,
nPixelsX,
nPixelsY,
(long long int)numImages,
(long long int)acquisitionTime,
(long long int)acquisitionPeriod,
ctime(&t));
if (strlen(message) > MAX_STR_LENGTH) {
cprintf(BG_RED,"Master File Size %d is greater than max str size %d\n",
(int)strlen(message), MAX_STR_LENGTH);
return FAIL;
}
if (fwrite((void*)message, 1, strlen(message), masterfd) != strlen(message))
return FAIL;
CloseDataFile(masterfd);
return OK;
}

View File

@ -28,14 +28,13 @@ uint64_t DataProcessor::RunningMask(0x0);
pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER;
const GeneralData* DataProcessor::generalData(0);
DataProcessor::DataProcessor(Fifo*& f, runStatus* s, pthread_mutex_t* m, fileFormat* ftype, bool* fwenable,
int* cbaction,
void (*dataReadycb)(int, char*, int, FILE*, char*, void*),
void *pDataReadycb) :
ThreadObject(NumberofDataProcessors),
generalData(0),
fifo(f),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
@ -58,6 +57,7 @@ DataProcessor::DataProcessor(Fifo*& f, runStatus* s, pthread_mutex_t* m, fileFor
ErrorMask ^= (1<<index);
pthread_mutex_unlock(&Mutex);
}
NumberofDataProcessors++;
FILE_LOG (logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors << endl;
}
@ -79,14 +79,6 @@ uint64_t DataProcessor::GetRunningMask() {
return RunningMask;
}
void DataProcessor::SetGeneralData(GeneralData*& g) {
generalData = g;
#ifdef VERY_VERBOSE
generalData->Print();
#endif
}
/** non static functions */
/** getters */
string DataProcessor::GetType(){
@ -136,11 +128,9 @@ void DataProcessor::StopRunning() {
void DataProcessor::SetFifo(Fifo*& f) {
fifo = f;
}
void DataProcessor::ResetParametersforNewAcquisition() {
numTotalFramesCaught = 0;
firstAcquisitionIndex = 0;
@ -180,39 +170,41 @@ void DataProcessor::RecordFirstIndices(uint64_t fnum) {
}
void DataProcessor::SetMaxFramesPerFile() {
if (file->GetType() == BINARY)
void DataProcessor::SetGeneralData(GeneralData* g) {
generalData = g;
#ifdef VERY_VERBOSE
generalData->Print();
#endif
if (!file) {
cprintf(RED, "Error Calling SetGeneralData with no file object. Should not be here\n");
return;
}
if (file->GetFileType() == BINARY)
file->SetMaxFramesPerFile(generalData->maxFramesPerFile);
else if (file->GetFileType() == HDF5) {
file->SetNumberofPixels(generalData->nPixelsX, generalData->nPixelsY);
}
}
void DataProcessor::SetFileFormat(const fileFormat f) {
if (*fileFormatType != f) {
switch(f){
#ifdef HDF5C
case HDF5:
*fileFormatType = f;
#endif
default:
*fileFormatType = f;
break;
}
if (file->GetFileType() != f) {
//remember the pointer values before they are destroyed
char* fname=0; char* fpath=0; uint64_t* findex=0; bool* frindexenable=0;
bool* fwenable=0; bool* owenable=0; int* dindex=0; int* nunits=0;
file->GetMemberPointerValues(fname, fpath, findex, frindexenable, owenable, dindex, nunits);
SetupFileWriter(fname, fpath, findex, frindexenable, owenable, dindex, nunits);
bool* owenable=0; int* dindex=0; int* nunits=0; uint64_t* nf = 0; uint32_t* dr = 0;
file->GetMemberPointerValues(fname, fpath, findex, frindexenable, owenable, dindex, nunits, nf, dr);
//create file writer with same pointers
SetupFileWriter(fname, fpath, findex, frindexenable, owenable, dindex, nunits, nf, dr);
}
}
void DataProcessor::SetupFileWriter(char* fname, char* fpath, uint64_t* findex,
bool* frindexenable, bool* owenable, int* dindex, int* nunits)
bool* frindexenable, bool* owenable, int* dindex, int* nunits, uint64_t* nf, uint32_t* dr, GeneralData* g)
{
if (g)
generalData = g;
if (file)
delete file;
@ -221,19 +213,21 @@ void DataProcessor::SetupFileWriter(char* fname, char* fpath, uint64_t* findex,
#ifdef HDF5C
case HDF5:
file = new HDF5File(index, fname, fpath, findex,
frindexenable, owenable, dindex, nunits);
frindexenable, owenable, dindex, nunits, nf, dr, generalData->nPixelsX, generalData->nPixelsY);
break;
#endif
default:
file = new BinaryFile(index, fname, fpath, findex,
frindexenable, owenable, dindex, nunits, generalData->maxFramesPerFile);
frindexenable, owenable, dindex, nunits, nf, dr, generalData->maxFramesPerFile);
break;
}
}
int DataProcessor::CreateNewFile() {
if (!file)
int DataProcessor::CreateNewFile(bool en, uint64_t nf, uint64_t at, uint64_t ap) {
file->CloseAllFiles();
if (file->CreateCommonFiles(en, generalData->imageSize, generalData->nPixelsX, generalData->nPixelsY,
at, ap) == FAIL)
return FAIL;
if (file->CreateFile(currentFrameIndex) == FAIL)
return FAIL;
@ -241,10 +235,9 @@ int DataProcessor::CreateNewFile() {
}
void DataProcessor::CloseFile() {
void DataProcessor::CloseFiles() {
if (file)
file->CloseFile();
file->CloseAllFiles();
}
@ -272,7 +265,7 @@ void DataProcessor::ThreadExecution() {
void DataProcessor::StopProcessing(char* buf) {
fifo->FreeAddress(buf);
CloseFile();
file->CloseCurrentFile();
StopRunning();
cprintf(BLUE,"%d: Processing Completed\n", index);
}
@ -282,11 +275,20 @@ void DataProcessor::ProcessAnImage(char* buf) {
numFramesCaught++;
numTotalFramesCaught++;
uint64_t fnum = (*((uint64_t*)buf));
//#ifdef VERBOSE
#ifdef VERBOSE
if (!index) cprintf(BLUE,"DataProcessing %d: fnum:%lld\n", index, (long long int)fnum);
//#endif
if (!measurementStartedFlag)
#endif
if (!measurementStartedFlag) {
#ifdef VERBOSE
if (!index) cprintf(BLUE,"DataProcessing %d: fnum:%lld\n", index, (long long int)fnum);
#endif
RecordFirstIndices(fnum);
}
if (fileWriteEnable && *callbackAction == DO_EVERYTHING)
file->WriteToFile(buf + FIFO_HEADER_NUMBYTES, generalData->fifoBufferSize + FILE_FRAME_HEADER_SIZE, fnum-firstMeasurementIndex);
}

View File

@ -11,7 +11,7 @@ using namespace std;
File::File(int ind, char* fname, char* fpath, uint64_t* findex,
bool* frindexenable, bool* owenable, int* dindex, int* nunits):
bool* frindexenable, bool* owenable, int* dindex, int* nunits, uint64_t* nf, uint32_t* dr):
index(ind),
fileNamePrefix(fname),
filePath(fpath),
@ -19,14 +19,14 @@ File::File(int ind, char* fname, char* fpath, uint64_t* findex,
frameIndexEnable(frindexenable),
overWriteEnable(owenable),
detIndex(dindex),
numUnitsPerDetector(nunits)
numUnitsPerDetector(nunits),
numImages(nf),
dynamicRange(dr)
{
printf("%d File constructor\n",index);
master = index?false:true;
}
File::~File() {
printf("%d File Destructor\n", index);
}
File::~File() {}
string File::GetCurrentFileName() {
return currentFileName;
@ -53,8 +53,8 @@ void File::PrintMembers() {
}
void File::GetMemberPointerValues(char* fname, char* fpath, uint64_t* findex,
bool* frindexenable, bool* owenable, int* dindex, int* nunits)
void File::GetMemberPointerValues(char*& fname, char*& fpath, uint64_t*& findex,
bool*& frindexenable, bool*& owenable, int*& dindex, int*& nunits, uint64_t*& nf, uint32_t* dr)
{
fname = fileNamePrefix;
fpath = filePath;
@ -63,4 +63,6 @@ void File::GetMemberPointerValues(char* fname, char* fpath, uint64_t* findex,
owenable = overWriteEnable;
dindex = detIndex;
nunits = numUnitsPerDetector;
nf = numImages;
dr = dynamicRange;
}

View File

@ -4,19 +4,36 @@
* creates/closes the file and writes data to it
***********************************************/
//#define HDF5C /*********************?????????????????* Need to remove this in the header file to o*************************************?????????????????????***/
//#ifdef HDF5C
#include "HDF5File.h"
#include "receiver_defs.h"
#include <iostream>
#include <iomanip>
using namespace std;
pthread_mutex_t HDF5File::Mutex = PTHREAD_MUTEX_INITIALIZER;
H5File* HDF5File::masterfd = 0;
H5File* HDF5File::virtualfd = 0;
HDF5File::HDF5File(int ind, char* fname, char* fpath, uint64_t* findex,
bool* frindexenable, bool* owenable, int* dindex, int* nunits):
File(ind, fname, fpath, findex, frindexenable, owenable, dindex, nunits)
bool* frindexenable, bool* owenable, int* dindex, int* nunits, uint64_t* nf, uint32_t* dr,
int nx, int ny):
File(ind, fname, fpath, findex, frindexenable, owenable, dindex, nunits, nf, dr),
filefd(0),
dataspace(0),
dataset(0),
datatype(PredType::STD_U16LE),
nPixelsX(nx),
nPixelsY(ny)
{
printf("%d HDF5File constructor\n",index);
#ifdef VERBOSE
PrintMembers();
#endif
}
@ -26,25 +43,88 @@ HDF5File::~HDF5File() {
void HDF5File::PrintMembers() {
File::PrintMembers();
printf("\nHDF5 Print Members \n");
UpdateDataType();
if (datatype == PredType::STD_U8LE) {
printf("Data Type: 4 or 8\n");
} else if (datatype == PredType::STD_U16LE) {
printf("Data Type: 16\n");
} else if (datatype == PredType::STD_U32LE) {
printf("Data Type: 32\n");
} else {
cprintf(BG_RED,"unknown data type\n");
}
}
void HDF5File::SetNumberofPixels(uint32_t nx, uint32_t ny) {
nPixelsX = nx;
nPixelsY = ny;
}
slsReceiverDefs::fileFormat HDF5File::GetType() {
slsReceiverDefs::fileFormat HDF5File::GetFileType() {
return HDF5;
}
void HDF5File::UpdateDataType() {
switch(*dynamicRange){
case 4: datatype = PredType::STD_U8LE; break;
case 8: datatype = PredType::STD_U8LE; break;
case 16: datatype = PredType::STD_U16LE; break;
case 32: datatype = PredType::STD_U32LE; break;
default: cprintf(BG_RED,"unknown dynamic range\n");
datatype = PredType::STD_U16LE; break;
}
}
int HDF5File::CreateFile(uint64_t fnum) {
currentFileName = CreateFileName(filePath, fileNamePrefix, *fileIndex,
*frameIndexEnable, fnum, *detIndex, *numUnitsPerDetector, index);
//create file
UpdateDataType();
if (CreateDataFile(index, *overWriteEnable, *numImages, currentFileName, fnum,
((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), nPixelsY,
datatype, filefd, dataspace, dataset) == FAIL)
return FAIL;
printf("%d HDF5 File: %s\n", index, currentFileName.c_str());
return OK;
}
void HDF5File::CloseFile() {
void HDF5File::CloseCurrentFile() {
CloseDataFile(filefd, dataspace, dataset);
}
void HDF5File::CloseAllFiles() {
CloseDataFile(filefd, dataspace, dataset);
if (master)
CloseCommonDataFiles();
}
int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum) {
if (WriteDataFile(index, buffer, *numImages,
((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), nPixelsY, fnum,
dataspace, dataset, datatype) == OK)
return OK;
cprintf(RED,"%d Error: Write to file failed\n", index);
return FAIL;
}
int HDF5File::CreateCommonFiles(bool en, uint32_t size,
uint32_t nx, uint32_t ny, uint64_t at, uint64_t ap) {
if (master) {
string masterFileName="", virtualFileName="";
CreateCommonFileNames(masterFileName, virtualFileName, filePath, fileNamePrefix, *fileIndex);
printf("Master HDF5 File: %s\nVirtual HDF5 File: %s\n", masterFileName.c_str(), virtualFileName.c_str());
//create common files
return CreateCommonDataFiles(masterFileName, virtualFileName, *overWriteEnable,
en, size, nx, ny, at, ap);
}
return OK;
}
/*** static function ***/
string HDF5File::CreateFileName(char* fpath, char* fnameprefix, uint64_t findex,
bool frindexenable, uint64_t fnum, int dindex, int numunits, int unitindex) {
ostringstream osfn;
@ -56,10 +136,208 @@ string HDF5File::CreateFileName(char* fpath, char* fnameprefix, uint64_t findex,
return osfn.str();
}
int HDF5File::CreateDataFile(bool owenable, char* fname) {
/*** static function ***/
int HDF5File::CreateDataFile(int ind, bool owenable, uint64_t numf, string fname, uint64_t fnum, int nx, int ny,
DataType dtype, H5File*& fd, DataSpace*& dspace, DataSet*& dset) {
pthread_mutex_lock(&Mutex);
try {
Exception::dontPrint(); //to handle errors
//file
FileAccPropList flist;
flist.setFcloseDegree(H5F_CLOSE_STRONG);
if(!owenable)
fd = new H5File( fname.c_str(), H5F_ACC_EXCL, NULL,flist );
else
fd = new H5File( fname.c_str(), H5F_ACC_TRUNC, NULL, flist );
//attributes - version
double dValue=HDF5_WRITER_VERSION;
DataSpace dataspace_attr = DataSpace (H5S_SCALAR);
Attribute attribute = fd->createAttribute("version",PredType::NATIVE_DOUBLE, dataspace_attr);
attribute.write(PredType::NATIVE_DOUBLE, &dValue);
//dataspace
char dsetname[100];
hsize_t srcdims[3] = {numf,ny,nx};
dspace = new DataSpace (3,srcdims);
sprintf(dsetname, "/data_%012lld", (long long int)fnum);
//dataset
//create chunked dataset if greater than max_chunked_images
if(numf > MAX_CHUNKED_IMAGES){
DSetCreatPropList plist;
hsize_t chunk_dims[3] ={MAX_CHUNKED_IMAGES, ny, nx};
plist.setChunk(3, chunk_dims);
dset = new DataSet (fd->createDataSet(dsetname, dtype, *dspace, plist));
}else
dset = new DataSet (fd->createDataSet(dsetname, dtype, *dspace));
}
catch(Exception error){
cprintf(RED,"Error in creating HDF5 handles in object %d\n",ind);
error.printError();
fd->close();
pthread_mutex_unlock(&Mutex);
return FAIL;
}
pthread_mutex_unlock(&Mutex);
return OK;
}
void HDF5File::CloseDataFile() {
/*** static function ***/
void HDF5File::CloseDataFile(H5File*& fd, DataSpace*& dp, DataSet*& ds) {
pthread_mutex_lock(&Mutex);
try {
Exception::dontPrint(); //to handle errors
if(dp) {delete dp; dp = 0;}
if(ds) {delete ds; ds = 0;}
if(fd) {delete fd; fd = 0;}
} catch(Exception error) {
cprintf(RED,"Error in closing HDF5 handles\n");
error.printError();
}
pthread_mutex_unlock(&Mutex);
}
/*** static function ***/
int HDF5File::WriteDataFile(int ind, char* buf, uint64_t numImages, int nx, int ny, uint64_t fnum,
DataSpace* dspace, DataSet* dset, DataType dtype) {
pthread_mutex_lock(&Mutex);
hsize_t count[3] = {1, ny,nx};
hsize_t start[3] = {fnum%numImages, 0 , 0};
hsize_t dims2[2]={ny,nx};
try{
Exception::dontPrint(); //to handle errors
dspace->selectHyperslab( H5S_SELECT_SET, count, start);
DataSpace memspace(2,dims2);
dset->write(buf, dtype, memspace, *dspace);
memspace.close();
}
catch(Exception error){
cprintf(RED,"Error in writing to file in object %d\n",ind);
error.printError();
pthread_mutex_unlock(&Mutex);
return FAIL;
}
pthread_mutex_unlock(&Mutex);
return OK;
}
void HDF5File::CreateCommonFileNames(string& m, string& v, char* fpath, char* fnameprefix, uint64_t findex) {
ostringstream osfn;
osfn << fpath << "/" << fnameprefix;
osfn << "_master";
osfn << "_" << findex;
osfn << ".h5";
m = osfn.str();
v.assign(m);
v.replace(v.find("_master", 0), 7, "_virtual");
}
void HDF5File::CloseCommonDataFiles() {
pthread_mutex_lock(&Mutex);
try {
Exception::dontPrint(); //to handle errors
if(masterfd) {delete masterfd; masterfd = 0;}
if(virtualfd) {delete virtualfd; virtualfd = 0;}
} catch(Exception error) {
cprintf(RED,"Error in closing common HDF5 handles\n");
error.printError();
}
pthread_mutex_unlock(&Mutex);
}
int HDF5File::CreateCommonDataFiles(string m, string v, bool owenable,
bool tengigaEnable, uint32_t imageSize, uint32_t nPixelsX, uint32_t nPixelsY,
uint64_t acquisitionTime, uint64_t acquisitionPeriod) {
pthread_mutex_lock(&Mutex);
try {
//Exception::dontPrint(); //to handle errors
FileAccPropList flist;
flist.setFcloseDegree(H5F_CLOSE_STRONG);
if(!owenable)
masterfd = new H5File( m.c_str(), H5F_ACC_EXCL, NULL, flist );
else
masterfd = new H5File( m.c_str(), H5F_ACC_TRUNC, NULL, flist );
//variables
DataSpace dataspace = DataSpace (H5S_SCALAR);
Attribute attribute;
DataSet dataset;
int iValue=0;
double dValue=0;
StrType strdatatype(PredType::C_S1,256);
//create attributes
//version
dValue=HDF5_WRITER_VERSION;
attribute = masterfd->createAttribute("version",PredType::NATIVE_DOUBLE, dataspace);
attribute.write(PredType::NATIVE_DOUBLE, &dValue);
//Create a group in the file
Group group1( masterfd->createGroup( "entry" ) );
Group group2( group1.createGroup("data") );
Group group3( group1.createGroup("instrument") );
Group group4( group3.createGroup("beam") );
Group group5( group3.createGroup("detector") );
Group group6( group1.createGroup("sample") );
//Dynamic Range
dataset = group5.createDataSet ( "dynamic range", PredType::NATIVE_INT, dataspace );
dataset.write ( dynamicRange, PredType::NATIVE_INT);
attribute = dataset.createAttribute("unit",strdatatype, dataspace);
attribute.write(strdatatype, string("bits"));
//Ten Giga
iValue = tengigaEnable;
dataset = group5.createDataSet ( "ten giga enable", PredType::NATIVE_INT, dataspace );
dataset.write ( &iValue, PredType::NATIVE_INT);
//Image Size
dataset = group5.createDataSet ( "image size", PredType::NATIVE_INT, dataspace );
dataset.write ( &imageSize, PredType::NATIVE_INT);
attribute = dataset.createAttribute("unit",strdatatype, dataspace);
attribute.write(strdatatype, string("bytes"));
//x
dataset = group5.createDataSet ( "number of pixels in x axis", PredType::NATIVE_INT, dataspace );
dataset.write ( &nPixelsX, PredType::NATIVE_INT);
//y
dataset = group5.createDataSet ( "number of pixels in y axis", PredType::NATIVE_INT, dataspace );
dataset.write ( &nPixelsY, PredType::NATIVE_INT);
//Total Frames
dataset = group5.createDataSet ( "total frames", PredType::STD_U64LE, dataspace );
dataset.write ( &numImages, PredType::STD_U64LE);
//Exptime
dataset = group5.createDataSet ( "exposure time", PredType::STD_U64LE, dataspace );
dataset.write ( &acquisitionTime, PredType::STD_U64LE);
attribute = dataset.createAttribute("unit",strdatatype, dataspace);
attribute.write(strdatatype, string("ns"));
//Period
dataset = group5.createDataSet ( "acquisition period", PredType::STD_U64LE, dataspace );
dataset.write ( &acquisitionPeriod, PredType::STD_U64LE);
attribute = dataset.createAttribute("unit",strdatatype, dataspace);
attribute.write(strdatatype, string("ns"));
//Timestamp
time_t t = time(0);
dataset = group5.createDataSet ( "timestamp", strdatatype, dataspace );
dataset.write ( string(ctime(&t)), strdatatype );
masterfd->close();
} catch(Exception error) {
cprintf(RED,"Error in creating common HDF5 handles\n");
error.printError();
pthread_mutex_unlock(&Mutex);
return FAIL;
}
pthread_mutex_unlock(&Mutex);
return OK;
}
//#endif

View File

@ -44,7 +44,8 @@ Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e) :
currentFrameIndex(0),
lastCaughtFrameIndex(0),
carryOverFlag(0),
carryOverPacket(0)
carryOverPacket(0),
listeningPacket(0)
{
if(ThreadObject::CreateThread()){
pthread_mutex_lock(&Mutex);
@ -58,6 +59,7 @@ Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e) :
Listener::~Listener() {
if (carryOverPacket) delete carryOverPacket;
if (listeningPacket) delete listeningPacket;
ThreadObject::DestroyThread();
NumberofListeners--;
}
@ -145,6 +147,9 @@ void Listener::ResetParametersforNewMeasurement(){
if (carryOverPacket)
delete carryOverPacket;
carryOverPacket = new char[generalData->packetSize];
if (listeningPacket)
delete listeningPacket;
listeningPacket = new char[generalData->packetSize];
if(RunningMask){
pthread_mutex_lock(&Mutex);
@ -167,7 +172,7 @@ void Listener::RecordFirstIndices(uint64_t fnum) {
acquisitionStartedFlag = true;
firstAcquisitionIndex = fnum;
}
if (!index) cprintf(GREEN,"%d First Acquisition Index:%lld\n"
if (!index) cprintf(MAGENTA,"%d First Acquisition Index:%lld\n"
"%d First Measurement Index:%lld\n",
index, (long long int)firstAcquisitionIndex,
index, (long long int)firstMeasurementIndex);
@ -266,12 +271,12 @@ void Listener::StopListening(char* buf) {
uint32_t Listener::ListenToAnImage(char* buf) {
uint32_t rc = 0;
uint64_t fnum = 0; uint32_t pnum = 0;
uint32_t offset = 0;
uint32_t currentpnum = 0;
int psize = generalData->packetSize;
int dsize = generalData->dataSize;
//reset to -1
memset(buf,0xFF,psize);
memset(buf,0xFF,dsize);
//look for carry over
if (carryOverFlag) {
@ -281,45 +286,41 @@ uint32_t Listener::ListenToAnImage(char* buf) {
return generalData->imageSize;
}
carryOverFlag = false;
memcpy(buf,carryOverPacket, psize);
offset += psize;
memcpy(buf + (pnum * dsize), carryOverPacket + generalData->headerSizeinPacket, dsize);
(*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE))) = fnum;
}
while (offset < generalData->fifoBufferSize) {
while (pnum < (generalData->packetsPerFrame-1)) {
//listen to new packet
rc += udpSocket->ReceiveDataOnly(buf + offset);
rc += udpSocket->ReceiveDataOnly(listeningPacket);
if (rc <= 0) return 0;
//update parameters
numPacketsCaught++; //record immediately to get more time before socket shutdown
numTotalPacketsCaught++;
generalData->GetHeaderInfo(index,buf + offset,fnum,pnum);
generalData->GetHeaderInfo(index,listeningPacket,fnum,pnum);
lastCaughtFrameIndex = fnum;
//#ifdef VERBOSE
if (!index && !pnum) cprintf(BLUE,"Listening %d: fnum:%lld, pnum:%d\n", index, (long long int)fnum, pnum);
if (!index && !pnum) cprintf(GREEN,"Listening %d: fnum:%lld, pnum:%d\n", index, (long long int)fnum, pnum);
//#endif
if (!measurementStartedFlag)
RecordFirstIndices(fnum);
//future packet
if(fnum != currentFrameIndex) {
carryOverFlag = true;
memcpy(carryOverPacket,buf + offset, psize);
memcpy(carryOverPacket,listeningPacket, generalData->packetSize);
return generalData->imageSize;
}
//copy packet and update fnum
memcpy(buf + (pnum * dsize), listeningPacket + generalData->headerSizeinPacket, dsize);
(*((uint64_t*)(buf - FILE_FRAME_HEADER_SIZE))) = fnum;
//future packet of same frame
if(pnum != currentpnum) {
memcpy(buf + (pnum * psize), buf + offset, psize);
memset(buf + offset, 0xFF, psize);
}
//update offset & current frame index
offset = (pnum + 1) * psize;
}
return generalData->imageSize;

View File

@ -19,20 +19,20 @@ using namespace std;
/** cosntructor & destructor */
UDPStandardImplementation::UDPStandardImplementation() {
FILE_LOG (logDEBUG) << __AT__ << " called";
InitializeMembers();
}
UDPStandardImplementation::~UDPStandardImplementation() {
FILE_LOG (logDEBUG) << __AT__ << " called";
DeleteMembers();
}
void UDPStandardImplementation::DeleteMembers() {
FILE_LOG (logDEBUG) << __AT__ << " starting";
if (generalData) { delete generalData; generalData=0;}
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
@ -51,7 +51,7 @@ void UDPStandardImplementation::DeleteMembers() {
void UDPStandardImplementation::InitializeMembers() {
FILE_LOG (logDEBUG) << __AT__ << " starting";
UDPBaseImplementation::initializeMembers();
acquisitionPeriod = SAMPLE_TIME_IN_NS;
@ -76,7 +76,7 @@ void UDPStandardImplementation::InitializeMembers() {
/*** Overloaded Functions called by TCP Interface ***/
uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
FILE_LOG (logDEBUG) << __AT__ << " starting";
uint64_t sum = 0;
uint32_t flagsum = 0;
@ -94,7 +94,7 @@ uint64_t UDPStandardImplementation::getTotalFramesCaught() const {
}
uint64_t UDPStandardImplementation::getFramesCaught() const {
FILE_LOG (logDEBUG) << __AT__ << " starting";
uint64_t sum = 0;
uint32_t flagsum = 0;
@ -111,7 +111,7 @@ uint64_t UDPStandardImplementation::getFramesCaught() const {
}
int64_t UDPStandardImplementation::getAcquisitionIndex() const {
FILE_LOG (logDEBUG) << __AT__ << " starting";
uint64_t sum = 0;
uint32_t flagsum = 0;
@ -131,6 +131,15 @@ int64_t UDPStandardImplementation::getAcquisitionIndex() const {
void UDPStandardImplementation::setFileFormat(const fileFormat f){
FILE_LOG(logDEBUG) << __AT__ << " starting";
switch(f){
#ifdef HDF5C
case HDF5:
fileFormatType = f;
#endif
default:
fileFormatType = f;
break;
}
//destroy file writer, set file format and create file writer
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetFileFormat(f);
@ -141,7 +150,7 @@ void UDPStandardImplementation::setFileFormat(const fileFormat f){
void UDPStandardImplementation::setFileName(const char c[]) {
FILE_LOG (logDEBUG) << __AT__ << " starting";
if (strlen(c)) {
strcpy(fileName, c); //automatically update fileName in Filewriter (pointer)
@ -163,7 +172,7 @@ void UDPStandardImplementation::setFileName(const char c[]) {
int UDPStandardImplementation::setShortFrameEnable(const int i) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (myDetectorType != GOTTHARD) {
cprintf(RED, "Error: Can not set short frame for this detector\n");
@ -183,12 +192,10 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
if (SetupFifoStructure() == FAIL)
return FAIL;
Listener::SetGeneralData(generalData);
DataProcessor::SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->SetMaxFramesPerFile();
}
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetGeneralData(generalData);
}
FILE_LOG (logINFO) << "Short Frame Enable: " << shortFrameEnable;
return OK;
@ -196,7 +203,7 @@ int UDPStandardImplementation::setShortFrameEnable(const int i) {
int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (frameToGuiFrequency != freq) {
frameToGuiFrequency = freq;
@ -218,7 +225,7 @@ int UDPStandardImplementation::setFrameToGuiFrequency(const uint32_t freq) {
int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (dataStreamEnable != enable) {
dataStreamEnable = enable;
@ -247,7 +254,7 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {
int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (acquisitionPeriod != i) {
acquisitionPeriod = i;
@ -269,7 +276,7 @@ int UDPStandardImplementation::setAcquisitionPeriod(const uint64_t i) {
int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (acquisitionTime != i) {
acquisitionTime = i;
@ -291,7 +298,7 @@ int UDPStandardImplementation::setAcquisitionTime(const uint64_t i) {
int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (numberOfFrames != i) {
numberOfFrames = i;
@ -313,10 +320,11 @@ int UDPStandardImplementation::setNumberOfFrames(const uint64_t i) {
int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (dynamicRange != i) {
dynamicRange = i;
//side effects
generalData->SetDynamicRange(i,tengigaEnable);
@ -330,7 +338,7 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i) {
int UDPStandardImplementation::setTenGigaEnable(const bool b) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (tengigaEnable != b) {
tengigaEnable = b;
@ -347,7 +355,7 @@ int UDPStandardImplementation::setTenGigaEnable(const bool b) {
int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
FILE_LOG (logDEBUG) << __AT__ << " called";
if (fifoDepth != i) {
fifoDepth = i;
@ -363,7 +371,7 @@ int UDPStandardImplementation::setFifoDepth(const uint32_t i) {
int UDPStandardImplementation::setDetectorType(const detectorType d) {
FILE_LOG (logDEBUG) << __AT__ << " starting";
FILE_LOG (logDEBUG) << "Setting receiver type";
@ -395,8 +403,6 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
case JUNGFRAU: generalData = new JungfrauData(); break;
default: break;
}
Listener::SetGeneralData(generalData);
DataProcessor::SetGeneralData(generalData);
numThreads = generalData->threadsPerReceiver;
fifoDepth = generalData->defaultFifoDepth;
@ -428,11 +434,12 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
}
//set up writer and callbacks
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->SetupFileWriter(fileName, filePath, &fileIndex, &frameIndexEnable,
&overwriteEnable, &detID, &numThreads);
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->SetGeneralData(generalData);
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->SetupFileWriter(fileName, filePath, &fileIndex, &frameIndexEnable,
&overwriteEnable, &detID, &numThreads, &numberOfFrames, &dynamicRange, generalData);
}
FILE_LOG (logDEBUG) << " Detector type set to " << getDetectorType(d);
return OK;
}
@ -440,7 +447,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
void UDPStandardImplementation::resetAcquisitionCount() {
FILE_LOG (logDEBUG) << __AT__ << " starting";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewAcquisition();
@ -451,13 +458,39 @@ void UDPStandardImplementation::resetAcquisitionCount() {
FILE_LOG (logINFO) << "Acquisition Count has been reset";
}
int UDPStandardImplementation::VerifyCallBackAction() {
/** file path and file index not required?? or need to include detector index? do they need the datasize? its given for write data anyway */
callbackAction=startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition);
switch(callbackAction) {
case 0:
if (acquisitionFinishedCallBack == NULL || rawDataReadyCallBack == NULL) {
FILE_LOG(logERROR) << "Callback action 0: All the call backs must be registered";
return FAIL;
}
cout << "Start Acquisition, Acquisition Finished and Data Write has been defined externally" << endl;
break;
case 1:
if (rawDataReadyCallBack == NULL) {
FILE_LOG(logERROR) << "Callback action 1: RawDataReady call backs must be registered";
return FAIL;
}
cout << "Data Write defined externally" << endl;
break;
case 2:break;
default:
cprintf(RED,"Error: Unknown call back action.\n");
return FAIL;
}
return OK;
}
int UDPStandardImplementation::startReceiver(char *c) {
FILE_LOG (logDEBUG) << __AT__ << " called";
ResetParametersforNewMeasurement();
//listener
if (CreateUDPSockets() == FAIL) {
strcpy(c,"Could not create UDP Socket(s).");
FILE_LOG(logERROR) << c;
@ -465,33 +498,24 @@ int UDPStandardImplementation::startReceiver(char *c) {
}
cout << "Listener Ready ..." << endl;
if(fileWriteEnable){
if (SetupWriter() == FAIL) {
//callbacks
callbackAction = DO_EVERYTHING;
if (startAcquisitionCallBack)
if (VerifyCallBackAction() == FAIL)
return FAIL;
//processor->writer
if (fileWriteEnable) {
if (callbackAction > DO_NOTHING && SetupWriter() == FAIL) {
strcpy(c,"Could not create file.");
FILE_LOG(logERROR) << c;
return FAIL;
}
}
} else
cout << "Data will not be saved" << endl;
cout << "Processor Ready ..." << endl;
//callbacks
callbackAction = DO_EVERYTHING;
if (startAcquisitionCallBack) /** file path and file index not required?? or need to include detector index? do they need the datasize? its given for write data anyway */
callbackAction=startAcquisitionCallBack(filePath, fileName, fileIndex,
(generalData->fifoBufferSize) * numberofJobs + (generalData->fifoBufferHeaderSize), pStartAcquisition);
if (callbackAction < DO_EVERYTHING) {
FILE_LOG(logINFO) << "Call back activated. Data saving must be taken care of by user in call back.";
if (rawDataReadyCallBack) {
FILE_LOG(logINFO) << "Data Write has been defined externally";
}
} else if(!fileWriteEnable) {
FILE_LOG(logINFO) << "Data will not be saved";
}
//change status
//status
pthread_mutex_lock(&statusMutex);
status = RUNNING;
pthread_mutex_unlock(&(statusMutex));
@ -612,7 +636,7 @@ void UDPStandardImplementation::startReadout(){
void UDPStandardImplementation::shutDownUDPSockets() {
FILE_LOG (logDEBUG) << __AT__ << " called";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ShutDownUDPSocket();
}
@ -620,15 +644,15 @@ void UDPStandardImplementation::shutDownUDPSockets() {
void UDPStandardImplementation::closeFiles() {
FILE_LOG (logDEBUG) << __AT__ << " called";
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->CloseFile();
(*it)->CloseFiles();
}
void UDPStandardImplementation::SetLocalNetworkParameters() {
FILE_LOG (logDEBUG) << __AT__ << " called";
//to increase socket receiver buffer size and max length of input queue by changing kernel settings
if (myDetectorType == EIGER)
@ -657,7 +681,7 @@ void UDPStandardImplementation::SetLocalNetworkParameters() {
int UDPStandardImplementation::SetupFifoStructure() {
FILE_LOG (logDEBUG) << __AT__ << " called";
//recalculate number of jobs & fifodepth, return if no change
@ -721,7 +745,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
void UDPStandardImplementation::ResetParametersforNewMeasurement() {
FILE_LOG (logDEBUG) << __AT__ << " called";
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
@ -731,7 +755,7 @@ void UDPStandardImplementation::ResetParametersforNewMeasurement() {
int UDPStandardImplementation::CreateUDPSockets() {
FILE_LOG (logDEBUG) << __AT__ << " called";
bool error = false;
for (unsigned int i = 0; i < listener.size(); ++i)
@ -750,11 +774,11 @@ int UDPStandardImplementation::CreateUDPSockets() {
int UDPStandardImplementation::SetupWriter() {
FILE_LOG (logDEBUG) << __AT__ << " called";
bool error = false;
for (unsigned int i = 0; i < dataProcessor.size(); ++i)
if (dataProcessor[i]->CreateNewFile() == FAIL) {
if (dataProcessor[i]->CreateNewFile(tengigaEnable,
numberOfFrames, acquisitionTime, acquisitionPeriod) == FAIL) {
error = true;
break;
}
@ -770,7 +794,7 @@ int UDPStandardImplementation::SetupWriter() {
void UDPStandardImplementation::StartRunning() {
FILE_LOG (logDEBUG) << __AT__ << " called";
//set running mask and post semaphore to start the inner loop in execution thread
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it) {