moving towards c++ api

This commit is contained in:
Dhanya Maliakal
2016-12-12 14:24:24 +01:00
parent 28b0897dd7
commit d3200dc76f
10 changed files with 439 additions and 34 deletions

View File

@ -21,6 +21,13 @@
#include <zmq.h> //zmq
#include <sstream>
#ifdef HDF5C
#include "H5Cpp.h"
#ifndef H5_NO_NAMESPACE
using namespace H5;
#endif
#endif
using namespace std;
#define WRITE_HEADERS
@ -152,7 +159,15 @@ void UDPStandardImplementation::initializeMembers(){
strcpy(fileNamePerThread[i],"");
strcpy(fileHeader[i],"");
sfilefd[i] = 0;
#ifdef HDF5C
hdf5_fileId[i] = 0;
hdf5_dataspaceId[i] = 0;
hdf5_datasetId[i] = 0;
#endif
}
#ifdef HDF5C
hdf5DataType = H5T_STD_U16LE;
#endif
maxFramesPerFile = 0;
fileCreateSuccess = false;
@ -218,6 +233,10 @@ void UDPStandardImplementation::initializeMembers(){
createFileMask = 0x0;
killAllWritingThreads = false;
//***data shape ***
NX = 0;
NY = 0;
//***filter parameters***
commonModeSubtractionEnable = false;
moenchCommonModeSubtraction = 0;
@ -632,6 +651,15 @@ int UDPStandardImplementation::setDynamicRange(const uint32_t i){
FILE_LOG(logDEBUG) << "Info: Setting Dynamic Range to " << i;
dynamicRange = i;
#ifdef HDF5C
switch(dynamicRange){
case 4:
case 8: hdf5DataType = H5T_STD_U8LE; break;
case 16: hdf5DataType = H5T_STD_U16LE; break;
case 32: hdf5DataType = H5T_STD_U32LE; break;
default: cprintf(BG_RED,"unknown dynamic range\n");
}
#endif
if(myDetectorType == EIGER){
@ -828,6 +856,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
footerOffset = EIGER_DATA_PACKET_HEADER_SIZE + oneDataSize;
fifoBufferHeaderSize= (HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH);
excludeMissingPackets= true;
NX = EIGER_PIXELS_IN_ONE_ROW;
NY = EIGER_PIXELS_IN_ONE_COL;
break;
case JUNGFRAUCTB:
packetsPerFrame = JCTB_PACKETS_PER_FRAME;
@ -857,6 +887,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){
fifoBufferHeaderSize= (HEADER_SIZE_NUM_TOT_PACKETS + FILE_FRAME_HEADER_LENGTH);
//footerOffset = Not applicable;
excludeMissingPackets=true;
NX = JFRAU_PIXELS_IN_ONE_ROW;
NY = JFRAU_PIXELS_IN_ONE_COL;
break;
default:
FILE_LOG(logERROR) << "This is an unknown receiver type " << (int)d;
@ -968,6 +1000,35 @@ int UDPStandardImplementation::startReceiver(char *c){
fclose(sfilefd[i]);
sfilefd[i] = 0;
}
#ifdef HDF5C
pthread_mutex_lock(&writeMutex);
try{
Exception::dontPrint(); //to handle errors
if(hdf5_fileId[i]) delete hdf5_fileId[i];
if(hdf5_dataspaceId[i]) delete hdf5_dataspaceId[i];
if(hdf5_datasetId[i]) delete hdf5_datasetId[i];
}// end of try block
catch(Exception error){
cprintf(RED,"Error in closing HDF5 handles\n");
error.printError();
return -1;
}
/*
if(hdf5DataspaceId[i]){
H5Sclose(hdf5DataspaceId[i]);
hdf5DataspaceId[i] = 0;
}
if(hdf5DatasetId[i]){
H5Dclose(hdf5DatasetId[i]);
hdf5DatasetId[i] = 0;
}
if(hdf5fileId[i]){
H5Fclose(hdf5fileId[i]);
hdf5fileId[i] = 0;
}*/
pthread_mutex_unlock(&writeMutex);
#endif
//reset gui variables
frametoGuiCounter[i] = 0;
guiNumPackets[i] = 0;
@ -1201,6 +1262,36 @@ void UDPStandardImplementation::closeFile(int ithread){
fclose(sfilefd[ithread]);
sfilefd[ithread] = 0;
}
#ifdef HDF5C
pthread_mutex_lock(&writeMutex);
try{
Exception::dontPrint(); //to handle errors
if(hdf5_fileId[i]) delete hdf5_fileId[i];
if(hdf5_dataspaceId[i]) delete hdf5_dataspaceId[i];
if(hdf5_datasetId[i]) delete hdf5_datasetId[i];
}// end of try block
catch(Exception error){
cprintf(RED,"Error in closing HDF5 handles\n");
error.printError();
return -1;
}
/*
if(hdf5DataspaceId[ithread]){
H5Sclose(hdf5DataspaceId[ithread]);
hdf5DataspaceId[ithread] = 0;
}
if(hdf5DatasetId[ithread]){
H5Dclose(hdf5DatasetId[ithread]);
hdf5DatasetId[ithread] = 0;
}
if(hdf5fileId[ithread]){
H5Fclose(hdf5fileId[ithread]);
hdf5fileId[ithread] = 0;
}
*/
pthread_mutex_unlock(&writeMutex);
#endif
}
//compression
@ -1559,7 +1650,12 @@ int UDPStandardImplementation::setupWriter(){
if(fileCreateSuccess == OK){
FILE_LOG(logDEBUG) << "Successfully created file(s)";
#ifdef HDF5C
if(fileFormatType == HDF5)
cout << "HDF5 Writer Ready ..." << endl;
#else
cout << "Writer Ready ..." << endl;
#endif
}
return fileCreateSuccess;
@ -1572,44 +1668,150 @@ int UDPStandardImplementation::createNewFile(int ithread){
//create file name
if(!frameIndexEnable)
sprintf(completeFileName[ithread], "%s/%s_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
sprintf(completeFileName[ithread], "%s/%s_%lld", filePath,fileNamePerThread[ithread],(long long int)fileIndex);
else
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld.raw", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex);
sprintf(completeFileName[ithread], "%s/%s_f%012lld_%lld", filePath,fileNamePerThread[ithread],(long long int)lastFrameNumberInFile[ithread]+1,(long long int)fileIndex);
//file type
switch(fileFormatType){
#ifdef HDF5C
case HDF5:
strcat(completeFileName[ithread],".h5");
break;
#endif
default:
strcat(completeFileName[ithread],".raw");
break;
}
#ifdef DEBUG4
FILE_LOG(logINFO) << completefileName;
#endif
//filewrite enable & we allowed to create/close files
if(fileWriteEnable && cbAction > DO_NOTHING){
//close file pointers
if(sfilefd[ithread]){
//all threads need to close file, reset mask and exit loop
if(myDetectorType == EIGER && fileWriteEnable && (cbAction > DO_NOTHING)){
updateFileHeader(ithread);
fseek(sfilefd[ithread],0,0);
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
}
fflush(sfilefd[ithread]);
fclose(sfilefd[ithread]);
sfilefd[ithread] = 0;
}
//create file
if(!overwriteEnable){
if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){
FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread];
if(fileFormatType == BINARY){
//close file pointers
if(sfilefd[ithread]){
//all threads need to close file, reset mask and exit loop
if(myDetectorType == EIGER && fileWriteEnable && (cbAction > DO_NOTHING)){
updateFileHeader(ithread);
fseek(sfilefd[ithread],0,0);
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
}
fflush(sfilefd[ithread]);
fclose(sfilefd[ithread]);
sfilefd[ithread] = 0;
}
//create file
if(!overwriteEnable){
if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "wx"))){
FILE_LOG(logERROR) << "Could not create/overwrite file" << completeFileName[ithread];
sfilefd[ithread] = 0;
return FAIL;
}
}else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){
FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread];
sfilefd[ithread] = 0;
return FAIL;
}
}else if (NULL == (sfilefd[ithread] = fopen((const char *) (completeFileName[ithread]), "w"))){
FILE_LOG(logERROR) << "Could not create file" << completeFileName[ithread];
sfilefd[ithread] = 0;
return FAIL;
//setting file buffer size to 16mb
setvbuf(sfilefd[ithread],NULL,_IOFBF,BUF_SIZE);
}
//setting file buffer size to 16mb
setvbuf(sfilefd[ithread],NULL,_IOFBF,BUF_SIZE);
#ifdef HDF5C
else if(fileFormatType == HDF5){
pthread_mutex_lock(&writeMutex);
if(hdf5DataspaceId[ithread]){
H5Sclose(hdf5DataspaceId[ithread]);
hdf5DataspaceId[ithread] = 0;
}
if(hdf5DatasetId[ithread]){
H5Dclose(hdf5DatasetId[ithread]);
hdf5DatasetId[ithread] = 0;
}
if(hdf5fileId[ithread]){
// update attributes
bool error = true;
hsize_t dims = 1;
hid_t hdf5AttributeDataspaceId = H5Screate_simple (1, &dims, NULL);
if(hdf5AttributeDataspaceId>=0){
hid_t hdf5AttributeId = H5Acreate2(hdf5fileId[ithread],"Dynamic Range",H5T_STD_I32LE,hdf5AttributeDataspaceId,
H5P_DEFAULT, H5P_DEFAULT);
if(hdf5AttributeId>=0){
if(H5Awrite(hdf5AttributeId,H5T_STD_I32LE,&dynamicRange)>=0){
if(H5Aclose(hdf5AttributeId)>=0){
error = false;
}
}
}
}
if(error){
FILE_LOG(logERROR) << "Could not create attribute for " << completeFileName[ithread] << endl;
pthread_mutex_unlock(&writeMutex);
return FAIL;
}
H5Fclose(hdf5fileId[ithread]);
hdf5fileId[ithread] = 0;
}
//create file
if(!overwriteEnable){
hdf5fileId[ithread] = H5Fcreate(completeFileName[ithread], H5F_ACC_EXCL, H5P_DEFAULT, H5P_DEFAULT);
if(hdf5fileId[ithread]<0){
FILE_LOG(logERROR) << "Could not create new file" << completeFileName[ithread] << ". Check if it exists or permissions" << endl;
pthread_mutex_unlock(&writeMutex);
return FAIL;
}
}else{
hdf5fileId[ithread] = H5Fcreate(completeFileName[ithread], H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
if(hdf5fileId[ithread]<0){
FILE_LOG(logERROR) << "Could not create/overwrite new file" << completeFileName[ithread] << ". Check permissions" << endl;
pthread_mutex_unlock(&writeMutex);
return FAIL;
}
}
//create dataspace and dataset
hsize_t srcdims[3] = {NY,NX,numberOfFrames};
if(dynamicRange == 4)
srcdims[1] = NX/2;
hdf5DataspaceId[ithread] = H5Screate_simple (3, srcdims, NULL);
bool error = true;
if(hdf5DataspaceId[ithread]>=0){
//fillvalues
hid_t hdf5Dcpl = H5Pcreate(H5P_DATASET_CREATE);
if(hdf5Dcpl>=0){
//fill it
int fill_value = -1;
if(H5Pset_fill_value(hdf5Dcpl, hdf5DataType, &fill_value)>=0){
//dataset
hdf5DatasetId[ithread] = H5Dcreate2 (hdf5fileId[ithread],"run", hdf5DataType, hdf5DataspaceId[ithread],
H5P_DEFAULT, hdf5Dcpl, H5P_DEFAULT);
if(hdf5DatasetId[ithread]>=0){
error = false;
}else cprintf(RED, "error in dataset creation\n");
}else cprintf(RED, "error in filling values\n");
}else cprintf(RED, "error in fill value id creation\n");
}else cprintf(RED, "error in dataspace creation\n");
if(error){
FILE_LOG(logERROR) << "Could not create dataspace/dataset/attribute for thread " << ithread << endl;
pthread_mutex_unlock(&writeMutex);
return FAIL;
}
pthread_mutex_unlock(&writeMutex);
}
#endif
//Print packet loss and filenames
if(totalWritingPacketCount[ithread]){
@ -1648,7 +1850,7 @@ int UDPStandardImplementation::createNewFile(int ithread){
printf("Thread:%d File opened:%s\n",ithread, completeFileName[ithread]);
//write file header
if(myDetectorType == EIGER)
if(myDetectorType == EIGER && fileFormatType == BINARY)
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
}
@ -2667,10 +2869,41 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){
//all threads need to close file, reset mask and exit loop
if(myDetectorType == EIGER && fileWriteEnable && (cbAction > DO_NOTHING)){
updateFileHeader(ithread);
fseek(sfilefd[ithread],0,0);
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
if(fileWriteEnable && (cbAction > DO_NOTHING)){
#ifdef HDF5C
if(fileFormatType == HDF5){
if(hdf5fileId[ithread]){
pthread_mutex_lock(&writeMutex);
// update attributes
bool error = true;
hsize_t dims = 1;
hid_t hdf5AttributeDataspaceId = H5Screate_simple (1, &dims, NULL);
if(hdf5AttributeDataspaceId>=0){
hid_t hdf5AttributeId = H5Acreate2(hdf5fileId[ithread],"Dynamic Range",H5T_STD_I32LE,hdf5AttributeDataspaceId,
H5P_DEFAULT, H5P_DEFAULT);
if(hdf5AttributeId>=0){
if(H5Awrite(hdf5AttributeId,H5T_STD_I32LE,&dynamicRange)>=0){
if(H5Aclose(hdf5AttributeId)>=0){
error = false;
}
}
}
}
if(error){
FILE_LOG(logERROR) << "Could not create attribute for " << completeFileName[ithread] << endl;
}
pthread_mutex_unlock(&writeMutex);
}
}
else
#endif
if(fileFormatType == BINARY && myDetectorType == EIGER){
updateFileHeader(ithread);
fseek(sfilefd[ithread],0,0);
fwrite((void*)fileHeader[ithread], 1, FILE_HEADER_SIZE, sfilefd[ithread]);
}
}
//Print packet loss
@ -2881,10 +3114,37 @@ void UDPStandardImplementation::handleCompleteFramesOnly(int ithread, char* wbuf
//write to file if enabled and update write parameters
if((fileWriteEnable) && (sfilefd[ithread])){
if((fileWriteEnable) && (sfilefd[ithread]||hdf5fileId[ithread])){
if(tempframenumber && (tempframenumber%maxFramesPerFile) == 0)
createNewFile(ithread);
fwrite(wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, 1, (bufferSize + FILE_FRAME_HEADER_LENGTH), sfilefd[ithread]);
#ifdef HDF5C
if(fileFormatType == HDF5){
pthread_mutex_lock(&writeMutex);
hsize_t dims2[2]={NY,NX};
hsize_t count[3] = {NY,NX,1};
if(dynamicRange == 4){
dims2[1] = NX/2;
count[1] = NX/2;
}
hid_t memspace = H5Screate_simple(2,dims2,NULL);
hsize_t start[3] = {0, 0, tempframenumber};
if(H5Sselect_hyperslab(hdf5DataspaceId[ithread], H5S_SELECT_SET, start, NULL, count, NULL)<0){
cprintf(RED,"could not create hyperslab %d \n",ithread);
}
if(H5Dwrite (hdf5DatasetId[ithread], hdf5DataType, memspace, hdf5DataspaceId[ithread],
H5P_DEFAULT, wbuffer + fifoBufferHeaderSize)<0){
cprintf(RED,"could not write dataset for thread %d \n",ithread);
}
H5Sclose(memspace);
pthread_mutex_unlock(&writeMutex);
}else
#endif
if(fileFormatType == BINARY)
fwrite(wbuffer + HEADER_SIZE_NUM_TOT_PACKETS, 1, (bufferSize + FILE_FRAME_HEADER_LENGTH), sfilefd[ithread]);
}