created virtual datasets for bunchid and subframenumber

This commit is contained in:
Dhanya Maliakal
2017-03-02 11:43:11 +01:00
parent a7309be567
commit 537260879a
9 changed files with 431 additions and 147 deletions

View File

@ -63,6 +63,7 @@ DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* tim
memset((void*)&timerBegin, 0, sizeof(timespec));
currentHeader = new char[255];
strcpy(fileNametoStream, "");
}
@ -121,9 +122,11 @@ void DataStreamer::ResetParametersforNewAcquisition() {
acquisitionStartedFlag = false;
}
void DataStreamer::ResetParametersforNewMeasurement(){
void DataStreamer::ResetParametersforNewMeasurement(char* fname){
firstMeasurementIndex = 0;
measurementStartedFlag = false;
strcpy(fileNametoStream, fname);
cprintf(BLUE,"fname:%s\n",fname);
CreateHeaderPart1();
}
@ -312,17 +315,15 @@ int DataStreamer::SendHeader(uint64_t fnum, uint32_t snum, bool dummy) {
uint64_t frameIndex = -1;
uint64_t acquisitionIndex = -1;
uint32_t subframeIndex = -1;
char fname[MAX_STR_LENGTH] = "run";
char buf[1000] = "";
if (!dummy) {
frameIndex = fnum - firstMeasurementIndex;
acquisitionIndex = fnum - firstAcquisitionIndex;
subframeIndex = snum;
/* fname to be included in fifo buffer? */
}
int len = sprintf(buf, jsonHeaderFormat, currentHeader, acquisitionIndex, frameIndex, subframeIndex, fname);
int len = sprintf(buf, jsonHeaderFormat, currentHeader, acquisitionIndex, frameIndex, subframeIndex, fileNametoStream);
#ifdef VERBOSE
printf("%d Streamer: buf:%s\n", index, buf);
#endif

View File

@ -26,6 +26,17 @@ HDF5File::HDF5File(int ind, int* nd, char* fname, char* fpath, uint64_t* findex,
dataspace(0),
dataset(0),
datatype(PredType::STD_U16LE),
dataspace_para(0),
para1("sub_frame_number"),
dataset_para1(0),
datatype_para1(PredType::STD_U32LE),
para2("bunch_id"),
dataset_para2(0),
datatype_para2(PredType::STD_U64LE),
nPixelsX(nx),
nPixelsY(ny),
numFramesInFile(0)
@ -86,12 +97,14 @@ int HDF5File::CreateFile(uint64_t fnum) {
if(!fnum) UpdateDataType();
uint64_t framestosave = ((*numImages - fnum) > maxFramesPerFile) ? maxFramesPerFile : (*numImages-fnum);
pthread_mutex_lock(&Mutex);
if (HDF5FileStatic::CreateDataFile(index, *overWriteEnable, currentFileName, *frameIndexEnable,
fnum, framestosave, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
datatype, filefd, dataspace, dataset,
HDF5_WRITER_VERSION, MAX_CHUNKED_IMAGES) == FAIL) {
HDF5_WRITER_VERSION, MAX_CHUNKED_IMAGES,
dataspace_para,
para1, dataset_para1, datatype_para1,
para2, dataset_para2, datatype_para2) == FAIL) {
pthread_mutex_unlock(&Mutex);
return FAIL;
}
@ -107,7 +120,7 @@ int HDF5File::CreateFile(uint64_t fnum) {
void HDF5File::CloseCurrentFile() {
pthread_mutex_lock(&Mutex);
HDF5FileStatic::CloseDataFile(index, filefd, dataspace, dataset);
HDF5FileStatic::CloseDataFile(index, filefd, dataspace, dataset, dataset_para1, dataset_para2);
if (master && (*detIndex==0)) {
HDF5FileStatic::CloseVirtualDataFile(virtualfd);
}
@ -116,7 +129,7 @@ void HDF5File::CloseCurrentFile() {
void HDF5File::CloseAllFiles() {
pthread_mutex_lock(&Mutex);
HDF5FileStatic::CloseDataFile(index, filefd, dataspace, dataset);
HDF5FileStatic::CloseDataFile(index, filefd, dataspace, dataset, dataset_para1, dataset_para2);
if (master && (*detIndex==0)) {
HDF5FileStatic::CloseMasterDataFile(masterfd);
HDF5FileStatic::CloseVirtualDataFile(virtualfd);
@ -126,17 +139,25 @@ void HDF5File::CloseAllFiles() {
int HDF5File::WriteToFile(char* buffer, int buffersize, uint64_t fnum) {
if (numFramesInFile >= maxFramesPerFile) {/**max *100?????????????*/
if (numFramesInFile >= maxFramesPerFile) {/**max multiply by 100?????????????*/
CloseCurrentFile();
CreateFile(fnum);
}
numFramesInFile++;
uint32_t snum = (*((uint32_t*)(buffer + FILE_FRAME_HDR_FNUM_SIZE)));
uint64_t bid = (*((uint64_t*)(buffer + FILE_FRAME_HDR_FNUM_SIZE + FILE_FRAME_HDR_SNUM_SIZE)));
pthread_mutex_lock(&Mutex);
if (HDF5FileStatic::WriteDataFile(index, buffer + FILE_FRAME_HEADER_SIZE, /** ignoring bunchid?????????? */
if (HDF5FileStatic::WriteDataFile(index, buffer + FILE_FRAME_HEADER_SIZE,
fnum%maxFramesPerFile, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
dataspace, dataset, datatype) == OK) {
pthread_mutex_unlock(&Mutex);
return OK;
if (HDF5FileStatic::WriteParameterDatasets(index, dataspace_para,
fnum%maxFramesPerFile,
dataset_para1, datatype_para1, &snum,
dataset_para2, datatype_para2, &bid) == OK) {
pthread_mutex_unlock(&Mutex);
return OK;
}
}
pthread_mutex_unlock(&Mutex);
cprintf(RED,"%d Error: Write to file failed\n", index);
@ -202,18 +223,31 @@ int HDF5File::CreateVirtualFile(uint64_t fnum) {
osfn << "/virtual_data";
if (*frameIndexEnable) osfn << "_f" << setfill('0') << setw(12) << fnum;
string virtualDatasetName = osfn.str();
//parameter 1 name
osfn.str(""); osfn.clear();
osfn << "/virtual_" << para1;
if (*frameIndexEnable) osfn << "_f" << setfill('0') << setw(12) << fnum;
string vpara1DatasetName = osfn.str();
//parameter 2 name
osfn.str(""); osfn.clear();
osfn << "/virtual_" << para2;
if (*frameIndexEnable) osfn << "_f" << setfill('0') << setw(12) << fnum;
string vpara2DatasetName = osfn.str();
uint64_t framestosave = ((*numImages - fnum) > maxFramesPerFile) ? maxFramesPerFile : (*numImages-fnum);
//create virtual file
pthread_mutex_lock(&Mutex);
int ret = HDF5FileStatic::CreateVirtualDataFile(virtualfd, virtualFileName, virtualDatasetName, srcDatasetName,
numReadouts, fileNames, *overWriteEnable, fnum, cdatatype,
para1, vpara1DatasetName, H5T_STD_U32LE,
para2, vpara2DatasetName, H5T_STD_U64LE,
numReadouts, fileNames, *overWriteEnable, cdatatype,
framestosave, nPixelsY, ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),
framestosave, numDetY * nPixelsY, numDetX * ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX),HDF5_WRITER_VERSION);
framestosave, numDetY * nPixelsY, numDetX * ((*dynamicRange==4) ? (nPixelsX/2) : nPixelsX), HDF5_WRITER_VERSION);
if (ret == OK)
ret = HDF5FileStatic::LinkVirtualInMaster(virtualFileName, virtualDatasetName, masterFileName);
ret = HDF5FileStatic::LinkVirtualInMaster(masterFileName, virtualFileName, virtualDatasetName, vpara1DatasetName, vpara2DatasetName);
pthread_mutex_unlock(&Mutex);
return ret;

View File

@ -29,7 +29,7 @@ pthread_mutex_t Listener::Mutex = PTHREAD_MUTEX_INITIALIZER;
const GeneralData* Listener::generalData(0);
Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf) :
Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act, uint64_t* nf, uint32_t* dr) :
ThreadObject(NumberofListeners),
fifo(f),
acquisitionStartedFlag(false),
@ -40,6 +40,7 @@ Listener::Listener(Fifo*& f, runStatus* s, uint32_t* portno, char* e, int* act,
eth(e),
activated(act),
numImages(nf),
dynamicRange(dr),
numTotalPacketsCaught(0),
numPacketsCaught(0),
firstAcquisitionIndex(0),
@ -302,7 +303,7 @@ uint32_t Listener::ListenToAnImage(char* buf) {
//look for carry over
if (carryOverFlag) {
//check if its the current image packet
generalData->GetHeaderInfo(index, carryOverPacket, fnum, pnum, snum, bid);
generalData->GetHeaderInfo(index, carryOverPacket, *dynamicRange, fnum, pnum, snum, bid);
if (fnum != currentFrameIndex) {
return generalData->imageSize;
}
@ -327,7 +328,7 @@ uint32_t Listener::ListenToAnImage(char* buf) {
//update parameters
numPacketsCaught++; //record immediately to get more time before socket shutdown
numTotalPacketsCaught++;
generalData->GetHeaderInfo(index, listeningPacket, fnum, pnum, snum, bid);
generalData->GetHeaderInfo(index, listeningPacket, *dynamicRange, fnum, pnum, snum, bid);
lastCaughtFrameIndex = fnum;
#ifdef VERBOSE
if (!index && !pnum) cprintf(GREEN,"Listening %d: fnum:%lld, pnum:%d\n", index, (long long int)fnum, pnum);

View File

@ -393,7 +393,7 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
//create threads
for ( int i=0; i < numThreads; ++i ) {
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames));
listener.push_back(new Listener(fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange));
dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable,
&callbackAction, rawDataReadyCallBack,pRawDataReady));
if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) {
@ -780,8 +780,13 @@ void UDPStandardImplementation::ResetParametersforNewMeasurement() {
(*it)->ResetParametersforNewMeasurement();
for (vector<DataProcessor*>::const_iterator it = dataProcessor.begin(); it != dataProcessor.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement();
if (dataStreamEnable) {
char fnametostream[1000];
sprintf(fnametostream, "%s/%s", filePath, fileName);
for (vector<DataStreamer*>::const_iterator it = dataStreamer.begin(); it != dataStreamer.end(); ++it)
(*it)->ResetParametersforNewMeasurement(fnametostream);
}
}