slsReceiver HDF5: unlimited x dimension (#images) by extending by numImages if caught more images, fixed virtual mapping, fixed linking for jungfrau single module (removed virtual_ in all namings

This commit is contained in:
2018-07-17 14:07:23 +02:00
parent 6d11faed87
commit 934bc5b9db
8 changed files with 114 additions and 34 deletions

View File

@ -198,9 +198,10 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject {
/** /**
* End of Acquisition * End of Acquisition
* @param anyPacketsCaught true if any packets are caught, else false
* @param numf number of images caught * @param numf number of images caught
*/ */
void EndofAcquisition(uint64_t numf); void EndofAcquisition(bool anyPacketsCaught, uint64_t numf);
private: private:

View File

@ -159,9 +159,10 @@ class File : private virtual slsReceiverDefs {
/** /**
* End of Acquisition * End of Acquisition
* @param anyPacketsCaught true if any packets are caught, else false
* @param numf number of images caught * @param numf number of images caught
*/ */
virtual void EndofAcquisition(uint64_t numf) { virtual void EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
cprintf(RED,"This is a generic function EndofAcquisition that should be overloaded by a derived class\n"); cprintf(RED,"This is a generic function EndofAcquisition that should be overloaded by a derived class\n");
} }

View File

@ -111,9 +111,10 @@ class HDF5File : private virtual slsReceiverDefs, public File, public HDF5FileSt
/** /**
* End of Acquisition * End of Acquisition
* @param anyPacketsCaught true if any packets are caught, else false
* @param numf number of images caught * @param numf number of images caught
*/ */
void EndofAcquisition(uint64_t numf); void EndofAcquisition(bool anyPacketsCaught, uint64_t numf);
/** /**
* Create Virtual File * Create Virtual File
@ -180,5 +181,8 @@ class HDF5File : private virtual slsReceiverDefs, public File, public HDF5FileSt
/** Dataset array for parameters */ /** Dataset array for parameters */
DataSet* dataset_para[HDF5FileStatic::NumberofParameters]; DataSet* dataset_para[HDF5FileStatic::NumberofParameters];
/** Number of Images (including extended during acquisition) */
uint64_t extNumImages;
}; };
#endif #endif

View File

@ -187,6 +187,12 @@ public:
/** /**
* Write Parameter Arrays as datasets (to virtual file) * Write Parameter Arrays as datasets (to virtual file)
* @param ind self index
* @param dpace_para parameter dataspace
* @param fnum frame number current
* @param dset_para parameter dataset
* @param header image header
* @returns 0 for success and 1 for fail
*/ */
static int WriteParameterDatasets(int ind, DataSpace* dspace_para, uint64_t fnum, static int WriteParameterDatasets(int ind, DataSpace* dspace_para, uint64_t fnum,
DataSet* dset_para[],sls_detector_header* header) DataSet* dset_para[],sls_detector_header* header)
@ -219,6 +225,44 @@ public:
return 0; return 0;
} }
/**
* Extend datasets in #images dimension (x dimension)
* @param ind self index
* @param dpace data space pointer address
* @param dset data set pointer
* @param dspace_para parameter dataspace address pointer
* @param dset dataset parameter pointer
* @param initialNumImages initial number of images
* @returns 0 for success and 1 for fail
*/
static int ExtendDataset(int ind, DataSpace*& dspace, DataSet* dset,
DataSpace*& dspace_para, DataSet* dset_para[],
uint64_t initialNumImages) {
try{
Exception::dontPrint(); //to handle errors
hsize_t dims[3];
herr_t status_n = dspace->getSimpleExtentDims(dims);
dims[0] += initialNumImages;
dset->extend(dims);
delete dspace;
dspace = new DataSpace(dset->getSpace());
hsize_t dims_para[1] = {dims[0]};
for (int i = 0; i < NumberofParameters; ++i)
dset_para[i]->extend(dims_para);
delete dspace_para;
dspace_para = new DataSpace(dset_para[0]->getSpace());
}
catch(Exception error){
cprintf(RED,"Error in extending dataset in object %d\n",ind);
error.printError();
return 1;
}
return 0;
}
/** /**
* Create master file * Create master file
@ -386,7 +430,8 @@ public:
//dataspace //dataspace
hsize_t srcdims[3] = {nDimx, nDimy, nDimz}; hsize_t srcdims[3] = {nDimx, nDimy, nDimz};
dspace = new DataSpace (3,srcdims); hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz};
dspace = new DataSpace (3,srcdims,srcdimsmax);
//dataset name //dataset name
ostringstream osfn; ostringstream osfn;
@ -396,19 +441,24 @@ public:
//dataset //dataset
//chunked dataset if greater than max_chunked_images //chunked dataset if greater than max_chunked_images
if(nDimx > maxchunkedimages){ // always create chunked dataset as unlimited is only supported with chunked layout
DSetCreatPropList plist; DSetCreatPropList plist;
hsize_t chunk_dims[3] ={maxchunkedimages, nDimy, nDimz}; hsize_t chunk_dims[3] ={maxchunkedimages, nDimy, nDimz};
plist.setChunk(3, chunk_dims); plist.setChunk(3, chunk_dims);
dset = new DataSet (fd->createDataSet(dsetname.c_str(), dtype, *dspace, plist)); dset = new DataSet (fd->createDataSet(dsetname.c_str(), dtype, *dspace, plist));
}else
dset = new DataSet (fd->createDataSet(dsetname.c_str(), dtype, *dspace));
//create parameter datasets //create parameter datasets
hsize_t dims[1] = {nDimx}; hsize_t dims[1] = {nDimx};
dspace_para = new DataSpace (1,dims); hsize_t dimsmax[1] = {H5S_UNLIMITED};
dspace_para = new DataSpace (1,dims,dimsmax);
// always create chunked dataset as unlimited is only supported with chunked layout
DSetCreatPropList paralist;
hsize_t chunkpara_dims[3] ={maxchunkedimages};
paralist.setChunk(1, chunkpara_dims);
for (int i = 0; i < NumberofParameters; ++i) for (int i = 0; i < NumberofParameters; ++i)
dset_para[i] = new DataSet(fd->createDataSet(ParameterNames[i], ParameterDataTypes[i], *dspace_para)); dset_para[i] = new DataSet(fd->createDataSet(ParameterNames[i], ParameterDataTypes[i], *dspace_para, paralist));
} }
catch(Exception error){ catch(Exception error){
cprintf(RED,"Error in creating HDF5 handles in object %d\n",ind); cprintf(RED,"Error in creating HDF5 handles in object %d\n",ind);
@ -549,11 +599,13 @@ public:
//source dataspace //source dataspace
hsize_t srcdims[3] = {nDimx, nDimy, nDimz}; hsize_t srcdims[3] = {nDimx, nDimy, nDimz};
hid_t srcDataspace = H5Screate_simple(3, srcdims, NULL); hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz};
hid_t srcDataspace = H5Screate_simple(3, srcdims, srcdimsmax);
if (srcDataspace < 0) if (srcDataspace < 0)
return CloseFileOnError(fd, string("Error in creating source dataspace in virtual file ") + virtualFileName + string("\n")); return CloseFileOnError(fd, string("Error in creating source dataspace in virtual file ") + virtualFileName + string("\n"));
hsize_t srcdims_para[1] = {nDimx}; hsize_t srcdims_para[1] = {nDimx};
hid_t srcDataspace_para = H5Screate_simple(1, srcdims_para, NULL); hsize_t srcdimsmax_para[1] = {H5S_UNLIMITED};
hid_t srcDataspace_para = H5Screate_simple(1, srcdims_para, srcdimsmax_para);
if (srcDataspace_para < 0) if (srcDataspace_para < 0)
return CloseFileOnError(fd, string("Error in creating source dataspace (parameters) in virtual file ") + virtualFileName + string("\n")); return CloseFileOnError(fd, string("Error in creating source dataspace (parameters) in virtual file ") + virtualFileName + string("\n"));
@ -587,7 +639,7 @@ public:
return CloseFileOnError(fd, string("Error in mapping files in virtual file ") + virtualFileName + string("\n")); return CloseFileOnError(fd, string("Error in mapping files in virtual file ") + virtualFileName + string("\n"));
//dataset //dataset
string virtualDatasetName = string("/virtual_") + srcDataseName; string virtualDatasetName = /*string("/virtual_") + */srcDataseName;
hid_t vdsdataset = H5Dcreate2 (fd, virtualDatasetName.c_str(), GetDataTypeinC(dataType), vdsDataspace, H5P_DEFAULT, dcpl, H5P_DEFAULT); hid_t vdsdataset = H5Dcreate2 (fd, virtualDatasetName.c_str(), GetDataTypeinC(dataType), vdsDataspace, H5P_DEFAULT, dcpl, H5P_DEFAULT);
if (vdsdataset < 0) if (vdsdataset < 0)
return CloseFileOnError(fd, string("Error in creating virutal dataset in virtual file ") + virtualFileName + string("\n")); return CloseFileOnError(fd, string("Error in creating virutal dataset in virtual file ") + virtualFileName + string("\n"));
@ -596,7 +648,7 @@ public:
//virtual parameter dataset //virtual parameter dataset
for (int i = 0; i < NumberofParameters; ++i) { for (int i = 0; i < NumberofParameters; ++i) {
hid_t vdsdataset_para = H5Dcreate2 (fd, hid_t vdsdataset_para = H5Dcreate2 (fd,
(string("/virtual_") + string (ParameterNames[i])).c_str(), (/*string("/virtual_") + */string (ParameterNames[i])).c_str(),
GetDataTypeinC(ParameterDataTypes[i]), vdsDataspace_para, H5P_DEFAULT, dcpl_para[i], H5P_DEFAULT); GetDataTypeinC(ParameterDataTypes[i]), vdsDataspace_para, H5P_DEFAULT, dcpl_para[i], H5P_DEFAULT);
if (vdsdataset_para < 0) if (vdsdataset_para < 0)
return CloseFileOnError(fd, string("Error in creating virutal dataset (parameters) in virtual file ") + virtualFileName + string("\n")); return CloseFileOnError(fd, string("Error in creating virutal dataset (parameters) in virtual file ") + virtualFileName + string("\n"));
@ -755,13 +807,13 @@ public:
//**paramter datasets** //**paramter datasets**
for (int i = 0; i < NumberofParameters; ++i){ for (int i = 0; i < NumberofParameters; ++i){
hid_t vdset_para = H5Dopen2( vfd, (string("/virtual_") + string (ParameterNames[i])).c_str(), H5P_DEFAULT); hid_t vdset_para = H5Dopen2( vfd, (/*string("/virtual_") + */string (ParameterNames[i])).c_str(), H5P_DEFAULT);
if (vdset_para < 0) { if (vdset_para < 0) {
H5Fclose(mfd); mfd = 0; H5Fclose(mfd); mfd = 0;
return CloseFileOnError( vfd, string("Error in opening virtual parameter dataset to create link\n")); return CloseFileOnError( vfd, string("Error in opening virtual parameter dataset to create link\n"));
} }
sprintf(linkname, "/entry/data/%s",(string("/virtual_") + string (ParameterNames[i])).c_str()); sprintf(linkname, "/entry/data/%s",(/*string("/virtual_") + */string (ParameterNames[i])).c_str());
if(H5Lcreate_external( virtualfname.c_str(), (string("/virtual_") + string (ParameterNames[i])).c_str(), if(H5Lcreate_external( virtualfname.c_str(), (/*string("/virtual_") + */string (ParameterNames[i])).c_str(),
mfd, linkname, H5P_DEFAULT, H5P_DEFAULT) < 0) { mfd, linkname, H5P_DEFAULT, H5P_DEFAULT) < 0) {
H5Fclose(mfd); mfd = 0; H5Fclose(mfd); mfd = 0;
return CloseFileOnError( vfd, string("Error in creating link to virtual parameter dataset\n")); return CloseFileOnError( vfd, string("Error in creating link to virtual parameter dataset\n"));

View File

@ -282,9 +282,9 @@ void DataProcessor::CloseFiles() {
file->CloseAllFiles(); file->CloseAllFiles();
} }
void DataProcessor::EndofAcquisition(uint64_t numf) { void DataProcessor::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
if (file && file->GetFileType() == HDF5) { if (file && file->GetFileType() == HDF5) {
file->EndofAcquisition(numf); file->EndofAcquisition(anyPacketsCaught, numf);
} }
} }

View File

@ -38,7 +38,8 @@ HDF5File::HDF5File(int ind, uint32_t maxf, const uint32_t* ppf,
numFramesInFile(0), numFramesInFile(0),
numActualPacketsInFile(0), numActualPacketsInFile(0),
numFilesinAcquisition(0), numFilesinAcquisition(0),
dataspace_para(0) dataspace_para(0),
extNumImages(0)
{ {
#ifdef VERBOSE #ifdef VERBOSE
PrintMembers(); PrintMembers();
@ -97,7 +98,7 @@ int HDF5File::CreateFile(uint64_t fnum) {
//first time //first time
if(!fnum) UpdateDataType(); if(!fnum) UpdateDataType();
uint64_t framestosave = ((*numImages - fnum) > maxFramesPerFile) ? maxFramesPerFile : (*numImages-fnum); uint64_t framestosave = ((extNumImages - fnum) > maxFramesPerFile) ? maxFramesPerFile : (extNumImages-fnum);
pthread_mutex_lock(&Mutex); pthread_mutex_lock(&Mutex);
if (HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, *frameIndexEnable, if (HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, *frameIndexEnable,
fnum, framestosave, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), fnum, framestosave, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
@ -145,6 +146,19 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
numFramesInFile++; numFramesInFile++;
numActualPacketsInFile += nump; numActualPacketsInFile += nump;
pthread_mutex_lock(&Mutex); pthread_mutex_lock(&Mutex);
// extend dataset (when receiver start followed by many status starts (jungfrau)))
if (fnum >= extNumImages) {
if (HDF5FileStatic::ExtendDataset(index, dataspace, dataset,
dataspace_para, dataset_para, *numImages) == OK) {
if (!silentMode) {
cprintf(BLUE,"%d Extending HDF5 dataset by %llu, Total x Dimension: %u\n",
index, extNumImages, extNumImages + *numImages);
}
extNumImages += *numImages;
}
}
if (HDF5FileStatic::WriteDataFile(index, buffer + sizeof(sls_detector_header), if (HDF5FileStatic::WriteDataFile(index, buffer + sizeof(sls_detector_header),
fnum%maxFramesPerFile, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), fnum%maxFramesPerFile, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
dataspace, dataset, datatype) == OK) { dataspace, dataset, datatype) == OK) {
@ -163,12 +177,16 @@ int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum, uint32_t
} }
int HDF5File::CreateMasterFile(bool en, uint32_t size, int HDF5File::CreateMasterFile(bool en, uint32_t size,
uint32_t nx, uint32_t ny, uint64_t at, uint64_t st, uint64_t ap) { uint32_t nx, uint32_t ny, uint64_t at, uint64_t st, uint64_t ap) {
//beginning of every acquisition //beginning of every acquisition
numFramesInFile = 0; numFramesInFile = 0;
numActualPacketsInFile = 0; numActualPacketsInFile = 0;
extNumImages = *numImages;
if (master && (*detIndex==0)) { if (master && (*detIndex==0)) {
virtualfd = 0; virtualfd = 0;
@ -185,11 +203,11 @@ int HDF5File::CreateMasterFile(bool en, uint32_t size,
} }
void HDF5File::EndofAcquisition(uint64_t numf) { void HDF5File::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
//not created before //not created before
if (!virtualfd) { if (!virtualfd && anyPacketsCaught) {
//only one file and one sub image //only one file and one sub image (link current file in master)
if (((numFilesinAcquisition == 1) && (numDetY*numDetX) == 1)) { if (((numFilesinAcquisition == 1) && (numDetY*numDetX) == 1)) {
//dataset name //dataset name
ostringstream osfn; ostringstream osfn;
@ -217,7 +235,7 @@ int HDF5File::CreateVirtualFile(uint64_t numf) {
virtualfd, masterFileName, virtualfd, masterFileName,
filePath, fileNamePrefix, *fileIndex, *frameIndexEnable, filePath, fileNamePrefix, *fileIndex, *frameIndexEnable,
*detIndex, *numUnitsPerDetector, *detIndex, *numUnitsPerDetector,
maxFramesPerFile, numf, maxFramesPerFile, numf+1,
"data", datatype, "data", datatype,
numDetY, numDetX, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), numDetY, numDetX, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
HDF5_WRITER_VERSION); HDF5_WRITER_VERSION);

View File

@ -290,7 +290,7 @@ void UDPBaseImplementation::setFilePath(const char c[]){
else else
FILE_LOG(logERROR) << "FilePath does not exist: " << filePath; FILE_LOG(logERROR) << "FilePath does not exist: " << filePath;
} }
FILE_LOG(logINFO) << "Info: File path: " << filePath; FILE_LOG(logINFO) << "File path: " << filePath;
} }
void UDPBaseImplementation::setFileIndex(const uint64_t i){ void UDPBaseImplementation::setFileIndex(const uint64_t i){

View File

@ -462,8 +462,9 @@ void UDPStandardImplementation::stopReceiver(){
if((*it)->GetMeasurementStartedFlag()) if((*it)->GetMeasurementStartedFlag())
anycaught = true; anycaught = true;
} }
if (anycaught)
dataProcessor[0]->EndofAcquisition(maxIndexCaught); //to create virtual file //to create virtual file & set files/acquisition to 0 (only hdf5 at the moment)
dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught);
} }
while(DataStreamer::GetRunningMask()){ while(DataStreamer::GetRunningMask()){
@ -480,7 +481,7 @@ void UDPStandardImplementation::stopReceiver(){
tot += dataProcessor[i]->GetNumFramesCaught(); tot += dataProcessor[i]->GetNumFramesCaught();
uint64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetPacketsCaught(); uint64_t missingpackets = numberOfFrames*generalData->packetsPerFrame-listener[i]->GetPacketsCaught();
if (missingpackets) { if ((int)missingpackets > 0) {
cprintf(RED, "\n[Port %d]\n",udpPortNum[i]); cprintf(RED, "\n[Port %d]\n",udpPortNum[i]);
cprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)missingpackets); cprintf(RED, "Missing Packets\t\t: %lld\n",(long long int)missingpackets);
cprintf(RED, "Complete Frames\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught()); cprintf(RED, "Complete Frames\t\t: %lld\n",(long long int)dataProcessor[i]->GetNumFramesCaught());
@ -562,12 +563,15 @@ void UDPStandardImplementation::shutDownUDPSockets() {
void UDPStandardImplementation::closeFiles() { void UDPStandardImplementation::closeFiles() {
uint64_t maxIndexCaught = 0; uint64_t maxIndexCaught = 0;
bool anycaught = false;
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) { for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it) {
(*it)->CloseFiles(); (*it)->CloseFiles();
maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex()); maxIndexCaught = max(maxIndexCaught, (*it)->GetProcessedMeasurementIndex());
if((*it)->GetMeasurementStartedFlag())
anycaught = true;
} }
if (maxIndexCaught) //to create virtual file & set files/acquisition to 0 (only hdf5 at the moment)
dataProcessor[0]->EndofAcquisition(maxIndexCaught); dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught);
} }