This commit is contained in:
maliakal_d 2021-06-28 12:17:46 +02:00
parent 746cb6b49d
commit 3295d36f46
9 changed files with 3730 additions and 174 deletions

View File

@ -0,0 +1,173 @@
/************************************************
* @file BinaryFile.cpp
* @short sets/gets properties for the binary file,
* creates/closes the file and writes data to it
***********************************************/
#include "BinaryFile.h"
#include "Fifo.h"
#include "MasterAttributes.h"
#include "receiver_defs.h"
#include <iomanip>
#include <iostream>
#include <string.h>
FILE *BinaryFile::masterfd = nullptr;
BinaryFile::BinaryFile(int ind, uint32_t *maxf, int *nd, std::string *fname,
std::string *fpath, uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf, uint32_t *dr,
uint32_t *portno, bool *smode)
: File(ind, BINARY, maxf, nd, fname, fpath, findex, owenable, dindex,
nunits, nf, dr, portno, smode) {
#ifdef VERBOSE
PrintMembers();
#endif
}
BinaryFile::~BinaryFile() { CloseAllFiles(); }
void BinaryFile::PrintMembers(TLogLevel level) {
File::PrintMembers(level);
LOG(logINFO) << "Max Frames Per File: " << *maxFramesPerFile;
LOG(logINFO) << "Number of Frames in File: " << numFramesInFile;
}
void BinaryFile::CreateFile() {
numFramesInFile = 0;
std::ostringstream os;
os << *filePath << "/" << *fileNamePrefix << "_d"
<< (*detIndex * (*numUnitsPerDetector) + index) << "_f" << subFileIndex
<< '_' << *fileIndex << ".raw";
currentFileName = os.str();
if (!(*overWriteEnable)) {
if (nullptr ==
(filefd = fopen((const char *)currentFileName.c_str(), "wx"))) {
filefd = nullptr;
throw sls::RuntimeError("Could not create/overwrite file " +
currentFileName);
}
} else if (nullptr ==
(filefd = fopen((const char *)currentFileName.c_str(), "w"))) {
filefd = nullptr;
throw sls::RuntimeError("Could not create file " + currentFileName);
}
// setting to no file buffering
setvbuf(filefd, nullptr, _IONBF, 0);
if (!(*silentMode)) {
LOG(logINFO) << "[" << *udpPortNumber
<< "]: Binary File created: " << currentFileName;
}
}
void BinaryFile::CloseAllFiles() {
CloseCurrentDataFile();
CloseMasterFile();
}
void BinaryFile::CloseMasterFile() {
if (master) {
if (masterfd)
fclose(masterfd);
masterfd = nullptr;
}
}
void BinaryFile::CloseCurrentDataFile() {
if (filefd)
fclose(filefd);
filefd = nullptr;
}
int BinaryFile::WriteData(char *buf, int bsize) {
if (!filefd)
return 0;
return fwrite(buf, 1, bsize, filefd);
}
void BinaryFile::WriteToFile(char *buffer, int buffersize,
uint64_t currentFrameNumber,
uint32_t numPacketsCaught) {
// check if maxframesperfile = 0 for infinite
if ((*maxFramesPerFile) && (numFramesInFile >= (*maxFramesPerFile))) {
CloseCurrentFile();
++subFileIndex;
CreateFile();
}
numFramesInFile++;
// write to file
int ret = 0;
// contiguous bitset
if (sizeof(sls_bitset) == sizeof(bitset_storage)) {
ret = WriteData(buffer, buffersize);
}
// not contiguous bitset
else {
// write detector header
ret = WriteData(buffer, sizeof(sls_detector_header));
// get contiguous representation of bit mask
bitset_storage storage;
memset(storage, 0, sizeof(bitset_storage));
sls_bitset bits = *(sls_bitset *)(buffer + sizeof(sls_detector_header));
for (int i = 0; i < MAX_NUM_PACKETS; ++i)
storage[i >> 3] |= (bits[i] << (i & 7));
// write bitmask
ret += WriteData((char *)storage, sizeof(bitset_storage));
// write data
ret += WriteData(buffer + sizeof(sls_detector_header),
buffersize - sizeof(sls_receiver_header));
}
// if write error
if (ret != buffersize) {
throw sls::RuntimeError(std::to_string(index) +
" : Write to file failed for image number " +
std::to_string(currentFrameNumber));
}
}
void BinaryFile::CreateMasterFile(MasterAttributes *attr) {
if (master) {
std::ostringstream os;
os << *filePath << "/" << *fileNamePrefix << "_master"
<< "_" << *fileIndex << ".raw";
masterFileName = os.str();
if (!(*silentMode)) {
LOG(logINFO) << "Master File: " << masterFileName;
}
// create master file
if (!(*overWriteEnable)) {
if (nullptr == (masterfd = fopen(
(const char *)masterFileName.c_str(), "wx"))) {
masterfd = nullptr;
throw sls::RuntimeError("Could not create binary master file "
"(without overwrite enable) " +
masterFileName);
}
} else if (nullptr ==
(masterfd =
fopen((const char *)masterFileName.c_str(), "w"))) {
masterfd = nullptr;
throw sls::RuntimeError("Could not create binary master file "
"(with overwrite enable) " +
masterFileName);
}
attr->WriteMasterBinaryAttributes(masterfd);
if (masterfd)
fclose(masterfd);
masterfd = nullptr;
}
}
void BinaryFile::StartofAcquisition() { numFramesInFile = 0; }

View File

@ -0,0 +1,58 @@
#pragma once
/************************************************
* @file BinaryFile.h
* @short sets/gets properties for the binary file,
* creates/closes the file and writes data to it
***********************************************/
/**
*@short sets/gets properties for the binary file, creates/closes the file and
*writes data to it
*/
#include "File.h"
#include <string>
class BinaryFile : private virtual slsDetectorDefs, public File {
public:
/**
* Constructor
* creates the File Writer
* @param ind self index
* @param maxf pointer to max frames per file
* @param nd pointer to number of detectors in each dimension
* @param fname pointer to file name prefix
* @param fpath pointer to file path
* @param findex pointer to file index
* @param owenable pointer to over write enable
* @param dindex pointer to detector index
* @param nunits pointer to number of theads/ units per detector
* @param nf pointer to number of images in acquisition
* @param dr pointer to dynamic range
* @param portno pointer to udp port number for logging
* @param smode pointer to silent mode
*/
BinaryFile(int ind, uint32_t *maxf, int *nd, std::string *fname,
std::string *fpath, uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf, uint32_t *dr,
uint32_t *portno, bool *smode);
~BinaryFile();
void PrintMembers(TLogLevel level = logDEBUG1) override;
void CreateFile() override;
void CreateMasterFile(MasterAttributes *attr) override;
void StartofAcquisition() override;
void CloseAllFiles() override;
void CloseCurrentDataFile() override;
void CloseMasterFile() override;
void WriteToFile(char *buffer, int buffersize, uint64_t currentFrameNumber,
uint32_t numPacketsCaught) override;
private:
int WriteData(char *buf, int bsize);
FILE *filefd = nullptr;
static FILE *masterfd;
uint32_t numFramesInFile = 0;
};

View File

@ -37,11 +37,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f, bool act,
memset((void *)&timerBegin, 0, sizeof(timespec));
}
DataProcessor::~DataProcessor() {
delete file;
delete masterFile;
delete virtualFile;
}
DataProcessor::~DataProcessor() {}
/** getters */
@ -76,138 +72,7 @@ void DataProcessor::RecordFirstIndex(uint64_t fnum) {
LOG(logDEBUG1) << index << " First Index:" << firstIndex;
}
void DataProcessor::SetGeneralData(GeneralData *g) {
generalData = g;
if (file != nullptr) {
if (file->GetFileType() == HDF5) {
file->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
if (masterFile != nullptr) {
if (masterFile->GetFileType() == HDF5) {
masterFile->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
if (virtualFile != nullptr) {
if (virtualFile->GetFileType() == HDF5) {
virtualFile->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
}
void DataProcessor::SetupFileWriter(fileFormat ftype, bool fwe, int act,
int depaden, int *nd, uint32_t *maxf,
std::string *fname, std::string *fpath,
uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf,
uint32_t *dr, uint32_t *portno,
GeneralData *g) {
activated = act;
deactivatedPaddingEnable = depaden;
if (g != nullptr)
generalData = g;
// close existing file objects
if (file != nullptr) {
delete file;
file = nullptr;
}
if (masterFile != nullptr) {
delete masterFile;
masterFile = nullptr;
}
if (virtualFile != nullptr) {
delete virtualFile;
virtualFile = nullptr;
}
// skip data file writing for deactivated non padded parts
bool skipDataFileWriting = false;
if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) {
skipDataFileWriting = true;
}
// create file objects
if (fwe) {
switch (fileFormatType) {
#ifdef HDF5C
case HDF5:
// data file
if (!skipDataFileWriting) {
file = new HDF5File(index, maxf, nd, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, portno,
generalData->nPixelsX,
generalData->nPixelsY, silentMode);
}
// master file
if ((index == 0) && (*dindex == 0)) {
masterFile = new HDF5File(index, maxf, nd, fname, fpath, findex,
owenable, dindex, nunits, nf, dr,
portno, generalData->nPixelsX,
generalData->nPixelsY, silentMode);
virtualFile = new HDF5File(index, maxf, nd, fname, fpath,
findex, owenable, dindex, nunits, nf,
dr, portno, generalData->nPixelsX,
generalData->nPixelsY, silentMode);
}
break;
#endif
default:
// data file
if (!skipDataFileWriting) {
file = new BinaryFile(index, maxf, nd, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, portno,
silentMode);
}
// master file
if ((index == 0) && (*dindex == 0)) {
masterFile = new BinaryFile(index, maxf, nd, fname, fpath,
findex, owenable, dindex, nunits,
nf, dr, portno, silentMode);
}
break;
}
}
}
void DataProcessor::CreateMasterFile(MasterAttributes *attr) {
if (masterFile == nullptr) {
throw sls::RuntimeError("master file object not contstructed");
}
masterFile->CloseMasterFile();
masterFile->CreateMasterFile(attr);
}
void DataProcessor::CreateFirstDataFile() {
if (file == nullptr) {
throw sls::RuntimeError("file object not contstructed");
}
file->CloseCurrentDataFile();
file->resetSubFileIndex();
file->StartofAcquisition();
// do not create file if deactivated and no padding
if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) {
return;
}
file->CreateFile();
}
void DataProcessor::CloseFiles() {
if (file != nullptr)
file->CloseAllFiles();
}
void DataProcessor::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
if ((file != nullptr) && file->GetFileType() == HDF5) {
try {
file->EndofAcquisition(anyPacketsCaught, numf);
} catch (const sls::RuntimeError &e) {
; // ignore for now //TODO: send error to client via stop receiver
}
}
}
void DataProcessor::SetGeneralData(GeneralData *g) { generalData = g; }
void DataProcessor::ThreadExecution() {
char *buffer = nullptr;

View File

@ -119,34 +119,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* @param portno pointer to udp port number
* @param g address of GeneralData (Detector Data) pointer
*/
void SetupFileWriter(fileFormat ftype, bool fwe, int act, int depaden,
int *nd, uint32_t *maxf, std::string *fname,
std::string *fpath, uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf, uint32_t *dr,
uint32_t *portno, GeneralData *g = nullptr);
/**
* Create Master File (also virtual if hdf5)
* @param attr master file attributes
*/
void CreateMasterFile(MasterAttributes *attr);
/**
* Create First Data File
*/
void CreatFirsteDataFile();
/**
* Closes files
*/
void CloseFiles();
/**
* End of Acquisition
* @param anyPacketsCaught true if any packets are caught, else false
* @param numf number of images caught
*/
void EndofAcquisition(bool anyPacketsCaught, uint64_t numf);
/**
* Update pixel dimensions in file writer
@ -252,15 +224,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
/** Detector Type */
detectorType myDetectorType;
/** File writer implemented as binary or hdf5 File */
File *file{nullptr};
/** master file */
File *masterFile{nullptr};
/** virtual file (for hdf5) */
File *virtualFile{nullptr};
/** Data Stream Enable */
bool *dataStreamEnable;

View File

@ -0,0 +1,515 @@
/************************************************
* @file DataProcessor.cpp
* @short creates data processor thread that
* pulls pointers to memory addresses from fifos
* and processes data stored in them & writes them to file
***********************************************/
#include "DataProcessor.h"
#include "BinaryFile.h"
#include "Fifo.h"
#include "GeneralData.h"
#include "MasterAttributes.h"
#ifdef HDF5C
#include "HDF5File.h"
#endif
#include "DataStreamer.h"
#include "sls/sls_detector_exceptions.h"
#include <cerrno>
#include <cstring>
#include <iostream>
const std::string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f, bool act,
bool depaden, bool *dsEnable, uint32_t *freq,
uint32_t *timer, uint32_t *sfnum, bool *fp,
bool *sm, std::vector<int> *cdl, int *cdo,
int *cad)
: ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype),
dataStreamEnable(dsEnable), activated(act),
deactivatedPaddingEnable(depaden), streamingFrequency(freq),
streamingTimerInMs(timer), streamingStartFnum(sfnum), silentMode(sm),
framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad), firstStreamerFrame(false) {
LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void *)&timerBegin, 0, sizeof(timespec));
}
DataProcessor::~DataProcessor() {
delete file;
delete masterFile;
delete virtualFile;
}
/** getters */
bool DataProcessor::GetStartedFlag() { return startedFlag; }
uint64_t DataProcessor::GetNumFramesCaught() { return numFramesCaught; }
uint64_t DataProcessor::GetCurrentFrameIndex() { return currentFrameIndex; }
uint64_t DataProcessor::GetProcessedIndex() {
return currentFrameIndex - firstIndex;
}
void DataProcessor::SetFifo(Fifo *f) { fifo = f; }
void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag = false;
numFramesCaught = 0;
firstIndex = 0;
currentFrameIndex = 0;
firstStreamerFrame = true;
}
void DataProcessor::RecordFirstIndex(uint64_t fnum) {
// listen to this fnum, later +1
currentFrameIndex = fnum;
startedFlag = true;
firstIndex = fnum;
LOG(logDEBUG1) << index << " First Index:" << firstIndex;
}
void DataProcessor::SetGeneralData(GeneralData *g) {
generalData = g;
if (file != nullptr) {
if (file->GetFileType() == HDF5) {
file->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
if (masterFile != nullptr) {
if (masterFile->GetFileType() == HDF5) {
masterFile->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
if (virtualFile != nullptr) {
if (virtualFile->GetFileType() == HDF5) {
virtualFile->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
}
void DataProcessor::SetupFileWriter(fileFormat ftype, bool fwe, int act,
int depaden, int *nd, uint32_t *maxf,
std::string *fname, std::string *fpath,
uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf,
uint32_t *dr, uint32_t *portno,
GeneralData *g) {
activated = act;
deactivatedPaddingEnable = depaden;
if (g != nullptr)
generalData = g;
// close existing file objects
if (file != nullptr) {
delete file;
file = nullptr;
}
if (masterFile != nullptr) {
delete masterFile;
masterFile = nullptr;
}
if (virtualFile != nullptr) {
delete virtualFile;
virtualFile = nullptr;
}
// skip data file writing for deactivated non padded parts
bool skipDataFileWriting = false;
if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) {
skipDataFileWriting = true;
}
// create file objects
if (fwe) {
switch (fileFormatType) {
#ifdef HDF5C
case HDF5:
// data file
if (!skipDataFileWriting) {
file = new HDF5File(index, maxf, nd, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, portno,
generalData->nPixelsX,
generalData->nPixelsY, silentMode);
}
// master file
if ((index == 0) && (*dindex == 0)) {
masterFile = new HDF5File(index, maxf, nd, fname, fpath, findex,
owenable, dindex, nunits, nf, dr,
portno, generalData->nPixelsX,
generalData->nPixelsY, silentMode);
virtualFile = new HDF5File(index, maxf, nd, fname, fpath,
findex, owenable, dindex, nunits, nf,
dr, portno, generalData->nPixelsX,
generalData->nPixelsY, silentMode);
}
break;
#endif
default:
// data file
if (!skipDataFileWriting) {
file = new BinaryFile(index, maxf, nd, fname, fpath, findex,
owenable, dindex, nunits, nf, dr, portno,
silentMode);
}
// master file
if ((index == 0) && (*dindex == 0)) {
masterFile = new BinaryFile(index, maxf, nd, fname, fpath,
findex, owenable, dindex, nunits,
nf, dr, portno, silentMode);
}
break;
}
}
}
void DataProcessor::CreateMasterFile(MasterAttributes *attr) {
if (masterFile == nullptr) {
throw sls::RuntimeError("master file object not contstructed");
}
masterFile->CloseMasterFile();
masterFile->CreateMasterFile(attr);
}
void DataProcessor::CreateFirstDataFile() {
if (file == nullptr) {
throw sls::RuntimeError("file object not contstructed");
}
file->CloseCurrentDataFile();
file->resetSubFileIndex();
file->StartofAcquisition();
// do not create file if deactivated and no padding
if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) {
return;
}
file->CreateFile();
}
void DataProcessor::CloseFiles() {
if (file != nullptr)
file->CloseAllFiles();
}
void DataProcessor::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) {
if ((file != nullptr) && file->GetFileType() == HDF5) {
try {
file->EndofAcquisition(anyPacketsCaught, numf);
} catch (const sls::RuntimeError &e) {
; // ignore for now //TODO: send error to client via stop receiver
}
}
}
void DataProcessor::ThreadExecution() {
char *buffer = nullptr;
fifo->PopAddress(buffer);
LOG(logDEBUG5) << "DataProcessor " << index
<< ", "
"pop 0x"
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
// check dummy
auto numBytes = (uint32_t)(*((uint32_t *)buffer));
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
uint64_t fnum = 0;
try {
fnum = ProcessAnImage(buffer);
} catch (const std::exception &e) {
fifo->FreeAddress(buffer);
return;
}
// stream (if time/freq to stream) or free
if (*dataStreamEnable && SendToStreamer()) {
// if first frame to stream, add frame index to fifo header (might
// not be the first)
if (firstStreamerFrame) {
firstStreamerFrame = false;
(*((uint32_t *)(buffer + FIFO_DATASIZE_NUMBYTES))) =
(uint32_t)(fnum - firstIndex);
}
fifo->PushAddressToStream(buffer);
} else {
fifo->FreeAddress(buffer);
}
}
void DataProcessor::StopProcessing(char *buf) {
LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
// stream or free
if (*dataStreamEnable)
fifo->PushAddressToStream(buf);
else
fifo->FreeAddress(buf);
if (file != nullptr)
file->CloseCurrentFile();
StopRunning();
LOG(logDEBUG1) << index << ": Processing Completed";
}
uint64_t DataProcessor::ProcessAnImage(char *buf) {
auto *rheader = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
sls_detector_header header = rheader->detHeader;
uint64_t fnum = header.frameNumber;
currentFrameIndex = fnum;
uint32_t nump = header.packetNumber;
if (nump == generalData->packetsPerFrame) {
numFramesCaught++;
}
LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum;
if (!startedFlag) {
RecordFirstIndex(fnum);
if (*dataStreamEnable) {
// restart timer
clock_gettime(CLOCK_REALTIME, &timerBegin);
timerBegin.tv_sec -= (*streamingTimerInMs) / 1000;
timerBegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000;
// to send first image
currentFreqCount = *streamingFrequency - *streamingStartFnum;
}
}
// frame padding
if (activated && *framePadding && nump < generalData->packetsPerFrame)
PadMissingPackets(buf);
// deactivated and padding enabled
else if (!activated && deactivatedPaddingEnable)
PadMissingPackets(buf);
// rearrange ctb digital bits (if ctbDbitlist is not empty)
if (!(*ctbDbitList).empty()) {
RearrangeDbitData(buf);
}
try {
// normal call back
if (rawDataReadyCallBack != nullptr) {
rawDataReadyCallBack((char *)rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
(uint32_t)(*((uint32_t *)buf)), pRawDataReady);
}
// call back with modified size
else if (rawDataModifyReadyCallBack != nullptr) {
auto revsize = (uint32_t)(*((uint32_t *)buf));
rawDataModifyReadyCallBack((char *)rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
revsize, pRawDataReady);
(*((uint32_t *)buf)) = revsize;
}
} catch (const std::exception &e) {
throw sls::RuntimeError("Get Data Callback Error: " +
std::string(e.what()));
}
// write to file
if (file != nullptr) {
try {
file->WriteToFile(
buf + FIFO_HEADER_NUMBYTES,
sizeof(sls_receiver_header) +
(uint32_t)(*((uint32_t *)buf)), //+ size of data (resizable
// from previous call back
fnum - firstIndex, nump);
} catch (const sls::RuntimeError &e) {
; // ignore write exception for now (TODO: send error message
// via stopReceiver tcp)
}
}
return fnum;
}
bool DataProcessor::SendToStreamer() {
// skip
if ((*streamingFrequency) == 0u) {
if (!CheckTimer())
return false;
} else {
if (!CheckCount())
return false;
}
return true;
}
bool DataProcessor::CheckTimer() {
struct timespec end;
clock_gettime(CLOCK_REALTIME, &end);
LOG(logDEBUG1) << index << " Timer elapsed time:"
<< ((end.tv_sec - timerBegin.tv_sec) +
(end.tv_nsec - timerBegin.tv_nsec) / 1000000000.0)
<< " seconds";
// still less than streaming timer, keep waiting
if (((end.tv_sec - timerBegin.tv_sec) +
(end.tv_nsec - timerBegin.tv_nsec) / 1000000000.0) <
((double)*streamingTimerInMs / 1000.00))
return false;
// restart timer
clock_gettime(CLOCK_REALTIME, &timerBegin);
return true;
}
bool DataProcessor::CheckCount() {
if (currentFreqCount == *streamingFrequency) {
currentFreqCount = 1;
return true;
}
currentFreqCount++;
return false;
}
void DataProcessor::SetPixelDimension() {
if (file != nullptr) {
if (file->GetFileType() == HDF5) {
file->SetNumberofPixels(generalData->nPixelsX,
generalData->nPixelsY);
}
}
}
void DataProcessor::registerCallBackRawDataReady(void (*func)(char *, char *,
uint32_t, void *),
void *arg) {
rawDataReadyCallBack = func;
pRawDataReady = arg;
}
void DataProcessor::registerCallBackRawDataModifyReady(
void (*func)(char *, char *, uint32_t &, void *), void *arg) {
rawDataModifyReadyCallBack = func;
pRawDataReady = arg;
}
void DataProcessor::PadMissingPackets(char *buf) {
LOG(logDEBUG) << index << ": Padding Missing Packets";
uint32_t pperFrame = generalData->packetsPerFrame;
auto *header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
uint32_t nmissing = pperFrame - header->detHeader.packetNumber;
sls_bitset pmask = header->packetsMask;
uint32_t dsize = generalData->dataSize;
if (myDetectorType == GOTTHARD2 && index != 0) {
dsize = generalData->vetoDataSize;
}
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
uint32_t corrected_dsize =
dsize - ((pperFrame * dsize) - generalData->imageSize);
LOG(logDEBUG1) << "bitmask: " << pmask.to_string();
for (unsigned int pnum = 0; pnum < pperFrame; ++pnum) {
// not missing packet
if (pmask[pnum])
continue;
// done with padding, exit loop earlier
if (nmissing == 0u)
break;
LOG(logDEBUG) << "padding for " << index << " for pnum: " << pnum
<< std::endl;
// missing packet
switch (myDetectorType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes
// data
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data +
// 640*2 bytes data !!
case GOTTHARD:
if (pnum == 0u)
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize - 2);
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize + 2);
break;
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (pperFrame - 1))
memset(buf + fifohsize + (pnum * dsize), 0xFF, corrected_dsize);
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize);
break;
default:
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize);
break;
}
--nmissing;
}
}
/** ctb specific */
void DataProcessor::RearrangeDbitData(char *buf) {
// TODO! (Erik) Refactor and add tests
int totalSize = (int)(*((uint32_t *)buf));
int ctbDigitalDataBytes =
totalSize - (*ctbAnalogDataBytes) - (*ctbDbitOffset);
// no digital data
if (ctbDigitalDataBytes == 0) {
LOG(logWARNING)
<< "No digital data for call back, yet dbitlist is not empty.";
return;
}
const int numSamples = (ctbDigitalDataBytes / sizeof(uint64_t));
const int digOffset = FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header) +
(*ctbAnalogDataBytes);
// ceil as numResult8Bits could be decimal
const int numResult8Bits =
ceil((double)(numSamples * (*ctbDbitList).size()) / 8.00);
std::vector<uint8_t> result(numResult8Bits);
uint8_t *dest = &result[0];
auto *source = (uint64_t *)(buf + digOffset + (*ctbDbitOffset));
// loop through digital bit enable vector
int bitoffset = 0;
for (auto bi : (*ctbDbitList)) {
// where numbits * numsamples is not a multiple of 8
if (bitoffset != 0) {
bitoffset = 0;
++dest;
}
// loop through the frame digital data
for (auto ptr = source; ptr < (source + numSamples);) {
// get selected bit from each 8 bit
uint8_t bit = (*ptr++ >> bi) & 1;
*dest |= bit << bitoffset;
++bitoffset;
// extract destination in 8 bit batches
if (bitoffset == 8) {
bitoffset = 0;
++dest;
}
}
}
// copy back to buf and update size
memcpy(buf + digOffset, result.data(), numResult8Bits * sizeof(uint8_t));
(*((uint32_t *)buf)) = numResult8Bits * sizeof(uint8_t);
}

View File

@ -0,0 +1,343 @@
#pragma once
/************************************************
* @file DataProcessor.h
* @short creates data processor thread that
* pulls pointers to memory addresses from fifos
* and processes data stored in them & writes them to file
***********************************************/
/**
*@short creates & manages a data processor thread each
*/
#include "ThreadObject.h"
#include "receiver_defs.h"
class GeneralData;
class Fifo;
class File;
class DataStreamer;
struct MasterAttributes;
#include <atomic>
#include <vector>
class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
public:
/**
* Constructor
* Calls Base Class CreateThread(), sets ErrorMask if error and increments
* NumberofDataProcessors
* @param ind self index
* @param dtype detector type
* @param f address of Fifo pointer
* @param act activated
* @param depaden deactivated padding enable
* @param dsEnable pointer to data stream enable
* @param dr pointer to dynamic range
* @param freq pointer to streaming frequency
* @param timer pointer to timer if streaming frequency is random
* @param sfnum pointer to streaming starting fnum
* @param fp pointer to frame padding enable
* @param sm pointer to silent mode
* @param qe pointer to quad Enable
* @param cdl pointer to vector or ctb digital bits enable
* @param cdo pointer to digital bits offset
* @param cad pointer to ctb analog databytes
*/
DataProcessor(int ind, detectorType dtype, Fifo *f, bool act, bool depaden,
bool *dsEnable, uint32_t *freq, uint32_t *timer,
uint32_t *sfnum, bool *fp, bool *sm, std::vector<int> *cdl,
int *cdo, int *cad);
/**
* Destructor
* Calls Base Class DestroyThread() and decrements NumberofDataProcessors
*/
~DataProcessor() override;
//*** getters ***
/**
* Get acquisition started flag
* @return acquisition started flag
*/
bool GetStartedFlag();
/**
* Get Frames Complete Caught
* @return number of frames
*/
uint64_t GetNumFramesCaught();
/**
* Gets Actual Current Frame Index (that has not been subtracted from
* firstIndex) thats been processed
* @return -1 if no frames have been caught, else current frame index
*/
uint64_t GetCurrentFrameIndex();
/**
* Get Current Frame Index thats been processed
* @return -1 if no frames have been caught, else current frame index
*/
uint64_t GetProcessedIndex();
/**
* Set Fifo pointer to the one given
* @param f address of Fifo pointer
*/
void SetFifo(Fifo *f);
/**
* Reset parameters for new acquisition
*/
void ResetParametersforNewAcquisition();
/**
* Set GeneralData pointer to the one given
* @param g address of GeneralData (Detector Data) pointer
*/
void SetGeneralData(GeneralData *g);
/**
* Set up file writer object and call backs
* @param ftype file format
* @param fwe file write enable
* @param act activated
* @param depad deactivated padding enable
* @param nd pointer to number of detectors in each dimension
* @param maxf pointer to max frames per file
* @param fname pointer to file name prefix
* @param fpath pointer to file path
* @param findex pointer to file index
* @param owenable pointer to over write enable
* @param dindex pointer to detector index
* @param nunits pointer to number of threads/ units per detector
* @param nf pointer to number of images in acquisition
* @param dr pointer to dynamic range
* @param portno pointer to udp port number
* @param g address of GeneralData (Detector Data) pointer
*/
void SetupFileWriter(fileFormat ftype, bool fwe, int act, int depaden,
int *nd, uint32_t *maxf, std::string *fname,
std::string *fpath, uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf, uint32_t *dr,
uint32_t *portno, GeneralData *g = nullptr);
/**
* Create Master File (also virtual if hdf5)
* @param attr master file attributes
*/
void CreateMasterFile(MasterAttributes *attr);
/**
* Create First Data File
*/
void CreatFirsteDataFile();
/**
* Closes files
*/
void CloseFiles();
/**
* End of Acquisition
* @param anyPacketsCaught true if any packets are caught, else false
* @param numf number of images caught
*/
void EndofAcquisition(bool anyPacketsCaught, uint64_t numf);
/**
* Update pixel dimensions in file writer
*/
void SetPixelDimension();
/**
* Call back for raw data
* args to raw data ready callback are
* sls_receiver_header frame metadata
* dataPointer is the pointer to the data
* dataSize in bytes is the size of the data in bytes.
*/
void registerCallBackRawDataReady(void (*func)(char *, char *, uint32_t,
void *),
void *arg);
/**
* Call back for raw data (modified)
* args to raw data ready callback are
* sls_receiver_header frame metadata
* dataPointer is the pointer to the data
* revDatasize is the reference of data size in bytes.
* Can be modified to the new size to be written/streamed. (only smaller
* value).
*/
void registerCallBackRawDataModifyReady(void (*func)(char *, char *,
uint32_t &, void *),
void *arg);
private:
/**
* Record First Index
* @param fnum frame index to record
*/
void RecordFirstIndex(uint64_t fnum);
/**
* Thread Exeution for DataProcessor Class
* Pop bound addresses, process them,
* write to file if needed & free the address
*/
void ThreadExecution() override;
/**
* Frees dummy buffer,
* reset running mask by calling StopRunning()
* @param buf address of pointer
*/
void StopProcessing(char *buf);
/**
* Process an image popped from fifo,
* write to file if fw enabled & update parameters
* @param buf address of pointer
* @returns frame number
*/
uint64_t ProcessAnImage(char *buf);
/**
* Calls CheckTimer and CheckCount for streaming frequency and timer
* and determines if the current image should be sent to streamer
* @returns true if it should to streamer, else false
*/
bool SendToStreamer();
/**
* This function should be called only in random frequency mode
* Checks if timer is done and ready to send to stream
* @returns true if ready to send to stream, else false
*/
bool CheckTimer();
/**
* This function should be called only in non random frequency mode
* Checks if count is done and ready to send to stream
* @returns true if ready to send to stream, else false
*/
bool CheckCount();
/**
* Pad Missing Packets from the bit mask
* @param buf buffer
*/
void PadMissingPackets(char *buf);
/**
* Align corresponding digital bits together (CTB only if ctbDbitlist is not
* empty)
*/
void RearrangeDbitData(char *buf);
/** type of thread */
static const std::string TypeName;
/** GeneralData (Detector Data) object */
const GeneralData *generalData{nullptr};
/** Fifo structure */
Fifo *fifo;
// individual members
/** Detector Type */
detectorType myDetectorType;
/** File writer implemented as binary or hdf5 File */
File *file{nullptr};
/** master file */
File *masterFile{nullptr};
/** virtual file (for hdf5) */
File *virtualFile{nullptr};
/** Data Stream Enable */
bool *dataStreamEnable;
/** Activated/Deactivated */
bool activated;
/** Deactivated padding enable */
bool deactivatedPaddingEnable;
/** Pointer to Streaming frequency, if 0, sending random images with a timer
*/
uint32_t *streamingFrequency;
/** Pointer to the timer if Streaming frequency is random */
uint32_t *streamingTimerInMs;
/** Pointer to streaming starting fnum */
uint32_t *streamingStartFnum;
/** Current frequency count */
uint32_t currentFreqCount{0};
/** timer beginning stamp for random streaming */
struct timespec timerBegin;
/** Silent Mode */
bool *silentMode;
/** frame padding */
bool *framePadding;
/** ctb digital bits enable list */
std::vector<int> *ctbDbitList;
/** ctb digital bits offset */
int *ctbDbitOffset;
/** ctb analog databytes */
int *ctbAnalogDataBytes;
// acquisition start
/** Aquisition Started flag */
std::atomic<bool> startedFlag{false};
/** Frame Number of First Frame */
std::atomic<uint64_t> firstIndex{0};
// for statistics
/** Number of complete frames caught */
uint64_t numFramesCaught{0};
/** Frame Number of latest processed frame number */
std::atomic<uint64_t> currentFrameIndex{0};
/** first streamer frame to add frame index in fifo header */
bool firstStreamerFrame{false};
// call back
/**
* Call back for raw data
* args to raw data ready callback are
* sls_receiver_header frame metadata
* dataPointer is the pointer to the data
* dataSize in bytes is the size of the data in bytes.
*/
void (*rawDataReadyCallBack)(char *, char *, uint32_t, void *) = nullptr;
/**
* Call back for raw data (modified)
* args to raw data ready callback are
* sls_receiver_header frame metadata
* dataPointer is the pointer to the data
* revDatasize is the reference of data size in bytes. Can be modified to
* the new size to be written/streamed. (only smaller value).
*/
void (*rawDataModifyReadyCallBack)(char *, char *, uint32_t &,
void *) = nullptr;
void *pRawDataReady{nullptr};
};

View File

@ -0,0 +1,830 @@
/************************************************
* @file HDF5File.cpp
* @short sets/gets properties for the HDF5 file,
* creates/closes the file and writes data to it
***********************************************/
#include "HDF5File.h"
#include "Fifo.h"
#include "MasterAttributes.h"
#include "receiver_defs.h"
#include <iomanip>
#include <iostream>
#include <libgen.h> //basename
#include <string.h>
std::mutex HDF5File::hdf5Lib;
HDF5File::HDF5File(int ind, uint32_t *maxf, int *nd, std::string *fname,
std::string *fpath, uint64_t *findex, bool *owenable,
int *dindex, int *nunits, uint64_t *nf, uint32_t *dr,
uint32_t *portno, uint32_t nx, uint32_t ny, bool *smode)
:
File(ind, HDF5, maxf, nd, fname, fpath, findex, owenable, dindex, nunits,
nf, dr, portno, smode),
masterfd(nullptr), virtualfd(0), filefd(nullptr), dataspace(nullptr),
dataset(nullptr), datatype(PredType::STD_U16LE), nPixelsX(nx),
nPixelsY(ny), numFramesInFile(0), numFilesinAcquisition(0),
dataspace_para(nullptr), extNumImages(0) {
PrintMembers();
dataset_para.clear();
parameterNames.clear();
parameterDataTypes.clear();
parameterNames = std::vector<std::string>{
"frame number",
"exp length or sub exposure time",
"packets caught",
"bunch id",
"timestamp",
"mod id",
"row",
"column",
"reserved",
"debug",
"round robin number",
"detector type",
"detector header version",
"packets caught bit mask",
};
StrType strdatatype(PredType::C_S1, sizeof(bitset_storage));
parameterDataTypes = std::vector<DataType>{
PredType::STD_U64LE, PredType::STD_U32LE, PredType::STD_U32LE,
PredType::STD_U64LE, PredType::STD_U64LE, PredType::STD_U16LE,
PredType::STD_U16LE, PredType::STD_U16LE, PredType::STD_U16LE,
PredType::STD_U32LE, PredType::STD_U16LE, PredType::STD_U8LE,
PredType::STD_U8LE, strdatatype};
}
HDF5File::~HDF5File() { CloseAllFiles(); }
void HDF5File::SetNumberofPixels(uint32_t nx, uint32_t ny) {
nPixelsX = nx;
nPixelsY = ny;
}
void HDF5File::CreateFile() {
numFilesinAcquisition++;
numFramesInFile = 0;
// first time
if (subFileIndex == 0u) {
switch (*dynamicRange) {
case 16:
datatype = PredType::STD_U16LE;
break;
case 32:
datatype = PredType::STD_U32LE;
break;
default:
datatype = PredType::STD_U8LE;
break;
}
}
CreateDataFile();
}
void HDF5File::CloseAllFiles() {
CloseCurrentDataFile();
CloseMasterFile();
}
void HDF5File::CloseMasterFile() {
if (master) {
CloseFile(masterfd, true);
// close virtual file
// c code due to only c implementation of H5Pset_virtual available
if (virtualfd != 0) {
if (H5Fclose(virtualfd) < 0) {
LOG(logERROR) << "Could not close virtual HDF5 handles";
}
virtualfd = 0;
}
}
}
void HDF5File::CloseCurrentDataFile() {
CloseFile(filefd, false);
for (unsigned int i = 0; i < dataset_para.size(); ++i)
delete dataset_para[i];
dataset_para.clear();
if (dataspace_para) {
delete dataspace_para;
dataspace_para = nullptr;
}
if (dataset) {
delete dataset;
dataset = nullptr;
}
if (dataspace) {
delete dataspace;
dataspace = nullptr;
}
}
void HDF5File::WriteToFile(char *buffer, int bufferSize,
uint64_t currentFrameNumber,
uint32_t numPacketsCaught) {
// check if maxframesperfile = 0 for infinite
if ((*maxFramesPerFile) && (numFramesInFile >= (*maxFramesPerFile))) {
CloseCurrentFile();
++subFileIndex;
CreateFile();
}
numFramesInFile++;
// extend dataset (when receiver start followed by many status starts
// (jungfrau)))
if (currentFrameNumber >= extNumImages) {
ExtendDataset();
}
WriteDataFile(currentFrameNumber, buffer + sizeof(sls_receiver_header));
WriteParameterDatasets(currentFrameNumber, (sls_receiver_header *)(buffer));
}
void HDF5File::CreateMasterFile(MasterAttributes *attr) {
if (master) {
virtualfd = 0;
CreateMasterDataFile(attr);
}
}
void HDF5File::StartofAcquisition() {
numFilesinAcquisition = 0;
numFramesInFile = 0;
extNumImages = *numImages;
}
void HDF5File::EndofAcquisition(bool anyPacketsCaught,
uint64_t numImagesCaught) {
// not created before
if (!virtualfd && anyPacketsCaught) {
// called only by the one maser receiver
if (master && masterfd != nullptr) {
// only one file and one sub image (link current file in master)
if (((numFilesinAcquisition == 1) && (numDetY * numDetX) == 1)) {
// dataset name
std::ostringstream oss;
oss << "/data";
if ((*numImages > 1)) {
oss << "_f" << std::setfill('0') << std::setw(12) << 0;
}
std::string dsetname = oss.str();
LinkVirtualInMaster(currentFileName, dsetname);
}
// create virutal file
else {
CreateVirtualDataFile(
// infinite images in 1 file, then maxfrperfile =
// numImagesCaught
((*maxFramesPerFile == 0) ? numImagesCaught + 1
: *maxFramesPerFile),
numImagesCaught + 1);
}
}
}
numFilesinAcquisition = 0;
}
void HDF5File::CloseFile(H5File *&fd, bool masterFile) {
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
try {
Exception::dontPrint(); // to handle errors
if (fd) {
fd->close();
delete fd;
fd = nullptr;
}
} catch (const Exception &error) {
LOG(logERROR) << "Could not close " << (masterFile ? "master" : "data")
<< " HDF5 handles of index " << index;
error.printErrorStack();
}
}
void HDF5File::WriteDataFile(uint64_t currentFrameNumber, char *buffer) {
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
uint64_t nDimx =
((*maxFramesPerFile == 0) ? currentFrameNumber
: currentFrameNumber % (*maxFramesPerFile));
uint32_t nDimy = nPixelsY;
uint32_t nDimz = ((*dynamicRange == 4) ? (nPixelsX / 2) : nPixelsX);
hsize_t count[3] = {1, nDimy, nDimz};
hsize_t start[3] = {nDimx, 0, 0};
hsize_t dims2[2] = {nDimy, nDimz};
try {
Exception::dontPrint(); // to handle errors
dataspace->selectHyperslab(H5S_SELECT_SET, count, start);
DataSpace memspace(2, dims2);
dataset->write(buffer, datatype, memspace, *dataspace);
memspace.close();
} catch (const Exception &error) {
LOG(logERROR) << "Could not write to file in object " << index;
error.printErrorStack();
throw sls::RuntimeError("Could not write to file in object " +
std::to_string(index));
}
}
void HDF5File::WriteParameterDatasets(uint64_t currentFrameNumber,
sls_receiver_header *rheader) {
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
uint64_t fnum =
((*maxFramesPerFile == 0) ? currentFrameNumber
: currentFrameNumber % (*maxFramesPerFile));
sls_detector_header header = rheader->detHeader;
hsize_t count[1] = {1};
hsize_t start[1] = {fnum};
int i = 0;
try {
Exception::dontPrint(); // to handle errors
dataspace_para->selectHyperslab(H5S_SELECT_SET, count, start);
DataSpace memspace(H5S_SCALAR);
dataset_para[0]->write(&header.frameNumber, parameterDataTypes[0],
memspace, *dataspace_para);
i = 1;
dataset_para[1]->write(&header.expLength, parameterDataTypes[1],
memspace, *dataspace_para);
i = 2;
dataset_para[2]->write(&header.packetNumber, parameterDataTypes[2],
memspace, *dataspace_para);
i = 3;
dataset_para[3]->write(&header.bunchId, parameterDataTypes[3], memspace,
*dataspace_para);
i = 4;
dataset_para[4]->write(&header.timestamp, parameterDataTypes[4],
memspace, *dataspace_para);
i = 5;
dataset_para[5]->write(&header.modId, parameterDataTypes[5], memspace,
*dataspace_para);
i = 6;
dataset_para[6]->write(&header.row, parameterDataTypes[6], memspace,
*dataspace_para);
i = 7;
dataset_para[7]->write(&header.column, parameterDataTypes[7], memspace,
*dataspace_para);
i = 8;
dataset_para[8]->write(&header.reserved, parameterDataTypes[8],
memspace, *dataspace_para);
i = 9;
dataset_para[9]->write(&header.debug, parameterDataTypes[9], memspace,
*dataspace_para);
i = 10;
dataset_para[10]->write(&header.roundRNumber, parameterDataTypes[10],
memspace, *dataspace_para);
i = 11;
dataset_para[11]->write(&header.detType, parameterDataTypes[11],
memspace, *dataspace_para);
i = 12;
dataset_para[12]->write(&header.version, parameterDataTypes[12],
memspace, *dataspace_para);
i = 13;
// contiguous bitset
if (sizeof(sls_bitset) == sizeof(bitset_storage)) {
dataset_para[13]->write((char *)&(rheader->packetsMask),
parameterDataTypes[13], memspace,
*dataspace_para);
}
// not contiguous bitset
else {
// get contiguous representation of bit mask
bitset_storage storage;
memset(storage, 0, sizeof(bitset_storage));
sls_bitset bits = rheader->packetsMask;
for (int i = 0; i < MAX_NUM_PACKETS; ++i)
storage[i >> 3] |= (bits[i] << (i & 7));
// write bitmask
dataset_para[13]->write((char *)storage, parameterDataTypes[13],
memspace, *dataspace_para);
}
i = 14;
} catch (const Exception &error) {
error.printErrorStack();
throw sls::RuntimeError(
"Could not write parameters (index:" + std::to_string(i) +
") to file in object " + std::to_string(index));
}
}
void HDF5File::ExtendDataset() {
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
try {
Exception::dontPrint(); // to handle errors
hsize_t dims[3];
dataspace->getSimpleExtentDims(dims);
dims[0] += *numImages;
dataset->extend(dims);
delete dataspace;
dataspace = nullptr;
dataspace = new DataSpace(dataset->getSpace());
hsize_t dims_para[1] = {dims[0]};
for (unsigned int i = 0; i < dataset_para.size(); ++i)
dataset_para[i]->extend(dims_para);
delete dataspace_para;
dataspace_para = nullptr;
dataspace_para = new DataSpace(dataset_para[0]->getSpace());
} catch (const Exception &error) {
error.printErrorStack();
throw sls::RuntimeError("Could not extend dataset in object " +
std::to_string(index));
}
if (!(*silentMode)) {
LOG(logINFO) << index << " Extending HDF5 dataset by " << extNumImages
<< ", Total x Dimension: " << (extNumImages + *numImages);
}
extNumImages += *numImages;
}
void HDF5File::CreateDataFile() {
std::ostringstream os;
os << *filePath << "/" << *fileNamePrefix << "_d"
<< (*detIndex * (*numUnitsPerDetector) + index) << "_f" << subFileIndex
<< '_' << *fileIndex << ".h5";
currentFileName = os.str();
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
uint64_t framestosave =
((*maxFramesPerFile == 0) ? *numImages : // infinite images
(((extNumImages - subFileIndex) > (*maxFramesPerFile))
? // save up to maximum at a time
(*maxFramesPerFile)
: (extNumImages - subFileIndex)));
uint64_t nDimx = framestosave;
uint32_t nDimy = nPixelsY;
uint32_t nDimz = ((*dynamicRange == 4) ? (nPixelsX / 2) : nPixelsX);
try {
Exception::dontPrint(); // to handle errors
// file
FileAccPropList fapl;
fapl.setFcloseDegree(H5F_CLOSE_STRONG);
filefd = nullptr;
if (!(*overWriteEnable))
filefd = new H5File(currentFileName.c_str(), H5F_ACC_EXCL,
FileCreatPropList::DEFAULT, fapl);
else
filefd = new H5File(currentFileName.c_str(), H5F_ACC_TRUNC,
FileCreatPropList::DEFAULT, fapl);
// attributes - version
double dValue = HDF5_WRITER_VERSION;
DataSpace dataspace_attr = DataSpace(H5S_SCALAR);
Attribute attribute = filefd->createAttribute(
"version", PredType::NATIVE_DOUBLE, dataspace_attr);
attribute.write(PredType::NATIVE_DOUBLE, &dValue);
// dataspace
hsize_t srcdims[3] = {nDimx, nDimy, nDimz};
hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz};
dataspace = nullptr;
dataspace = new DataSpace(3, srcdims, srcdimsmax);
// dataset name
std::ostringstream osfn;
osfn << "/data";
if (*numImages > 1)
osfn << "_f" << std::setfill('0') << std::setw(12) << subFileIndex;
std::string dsetname = osfn.str();
// dataset
// fill value
DSetCreatPropList plist;
int fill_value = -1;
plist.setFillValue(datatype, &fill_value);
// always create chunked dataset as unlimited is only
// supported with chunked layout
hsize_t chunk_dims[3] = {MAX_CHUNKED_IMAGES, nDimy, nDimz};
plist.setChunk(3, chunk_dims);
dataset = nullptr;
dataset = new DataSet(filefd->createDataSet(dsetname.c_str(), datatype,
*dataspace, plist));
// create parameter datasets
hsize_t dims[1] = {nDimx};
hsize_t dimsmax[1] = {H5S_UNLIMITED};
dataspace_para = nullptr;
dataspace_para = new DataSpace(1, dims, dimsmax);
// always create chunked dataset as unlimited is only
// supported with chunked layout
DSetCreatPropList paralist;
hsize_t chunkpara_dims[3] = {MAX_CHUNKED_IMAGES};
paralist.setChunk(1, chunkpara_dims);
for (unsigned int i = 0; i < parameterNames.size(); ++i) {
DataSet *ds = new DataSet(filefd->createDataSet(
parameterNames[i].c_str(), parameterDataTypes[i],
*dataspace_para, paralist));
dataset_para.push_back(ds);
}
} catch (const Exception &error) {
error.printErrorStack();
if (filefd) {
filefd->close();
}
throw sls::RuntimeError("Could not create HDF5 handles in object " +
index);
}
if (!(*silentMode)) {
LOG(logINFO) << *udpPortNumber
<< ": HDF5 File created: " << currentFileName;
}
}
void HDF5File::CreateMasterDataFile(MasterAttributes *attr) {
std::ostringstream os;
os << *filePath << "/" << *fileNamePrefix << "_master"
<< "_" << *fileIndex << ".h5";
masterFileName = os.str();
if (!(*silentMode)) {
LOG(logINFO) << "Master File: " << masterFileName;
}
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
try {
Exception::dontPrint(); // to handle errors
FileAccPropList flist;
flist.setFcloseDegree(H5F_CLOSE_STRONG);
masterfd = nullptr;
if (!(*overWriteEnable))
masterfd = new H5File(masterFileName.c_str(), H5F_ACC_EXCL,
FileCreatPropList::DEFAULT, flist);
else
masterfd = new H5File(masterFileName.c_str(), H5F_ACC_TRUNC,
FileCreatPropList::DEFAULT, flist);
// Create a group in the file
Group group1(masterfd->createGroup("entry"));
Group group2(group1.createGroup("data"));
Group group3(group1.createGroup("instrument"));
Group group4(group3.createGroup("beam"));
Group group5(group3.createGroup("detector"));
Group group6(group1.createGroup("sample"));
attr->WriteMasterHDF5Attributes(masterfd, &group5);
masterfd->close();
} catch (const Exception &error) {
error.printErrorStack();
if (masterfd) {
masterfd->close();
}
throw sls::RuntimeError("Could not create master HDF5 handles");
}
}
void HDF5File::CreateVirtualDataFile(uint32_t maxFramesPerFile, uint64_t numf) {
std::ostringstream osfn;
osfn << *filePath << "/" << *fileNamePrefix;
osfn << "_virtual";
osfn << "_" << *fileIndex;
osfn << ".h5";
std::string vname = osfn.str();
if (!(*silentMode)) {
LOG(logINFO) << "Virtual File: " << vname;
}
int numDetz = numDetX;
uint32_t nDimy = nPixelsY;
uint32_t nDimz = ((*dynamicRange == 4) ? (nPixelsX / 2) : nPixelsX);
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
try {
// file
hid_t dfal = H5Pcreate(H5P_FILE_ACCESS);
if (dfal < 0)
throw sls::RuntimeError(
"Could not create file access property for virtual file " +
vname);
if (H5Pset_fclose_degree(dfal, H5F_CLOSE_STRONG) < 0)
throw sls::RuntimeError(
"Could not set strong file close degree for virtual file " +
vname);
virtualfd = H5Fcreate(vname.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, dfal);
if (virtualfd < 0)
throw sls::RuntimeError("Could not create virtual file " + vname);
// attributes - version
hid_t dataspace_attr = H5Screate(H5S_SCALAR);
if (dataspace_attr < 0)
throw sls::RuntimeError(
"Could not create dataspace for attribute in virtual file " +
vname);
hid_t attrid = H5Acreate2(virtualfd, "version", H5T_NATIVE_DOUBLE,
dataspace_attr, H5P_DEFAULT, H5P_DEFAULT);
if (attrid < 0)
throw sls::RuntimeError(
"Could not create attribute in virtual file " + vname);
double attr_data = HDF5_WRITER_VERSION;
if (H5Awrite(attrid, H5T_NATIVE_DOUBLE, &attr_data) < 0)
throw sls::RuntimeError(
"Could not write attribute in virtual file " + vname);
if (H5Aclose(attrid) < 0)
throw sls::RuntimeError(
"Could not close attribute in virtual file " + vname);
// virtual dataspace
hsize_t vdsdims[3] = {numf, numDetY * nDimy, numDetz * nDimz};
hid_t vdsDataspace = H5Screate_simple(3, vdsdims, nullptr);
if (vdsDataspace < 0)
throw sls::RuntimeError(
"Could not create virtual dataspace in virtual file " + vname);
hsize_t vdsdims_para[2] = {numf, (unsigned int)numDetY * numDetz};
hid_t vdsDataspace_para = H5Screate_simple(2, vdsdims_para, nullptr);
if (vdsDataspace_para < 0)
throw sls::RuntimeError("Could not create virtual dataspace "
"(parameters) in virtual file " +
vname);
// fill values
hid_t dcpl = H5Pcreate(H5P_DATASET_CREATE);
if (dcpl < 0)
throw sls::RuntimeError(
"Could not create file creation properties in virtual file " +
vname);
int fill_value = -1;
if (H5Pset_fill_value(dcpl, GetDataTypeinC(datatype), &fill_value) < 0)
throw sls::RuntimeError(
"Could not create fill value in virtual file " + vname);
std::vector<hid_t> dcpl_para(parameterNames.size());
for (unsigned int i = 0; i < parameterNames.size(); ++i) {
dcpl_para[i] = H5Pcreate(H5P_DATASET_CREATE);
if (dcpl_para[i] < 0)
throw sls::RuntimeError(
"Could not create file creation properties (parameters) in "
"virtual file " +
vname);
if (H5Pset_fill_value(dcpl_para[i],
GetDataTypeinC(parameterDataTypes[i]),
&fill_value) < 0)
throw sls::RuntimeError("Could not create fill value "
"(parameters) in virtual file " +
vname);
}
// hyperslab
int numMajorHyperslab = numf / maxFramesPerFile;
if (numf % maxFramesPerFile)
numMajorHyperslab++;
uint64_t framesSaved = 0;
for (int j = 0; j < numMajorHyperslab; j++) {
uint64_t nDimx = ((numf - framesSaved) > maxFramesPerFile)
? maxFramesPerFile
: (numf - framesSaved);
hsize_t offset[3] = {framesSaved, 0, 0};
hsize_t count[3] = {nDimx, nDimy, nDimz};
hsize_t offset_para[2] = {framesSaved, 0};
hsize_t count_para[2] = {nDimx, 1};
for (int i = 0; i < numDetY * numDetz; ++i) {
// setect hyperslabs
if (H5Sselect_hyperslab(vdsDataspace, H5S_SELECT_SET, offset,
nullptr, count, nullptr) < 0) {
throw sls::RuntimeError("Could not select hyperslab");
}
if (H5Sselect_hyperslab(vdsDataspace_para, H5S_SELECT_SET,
offset_para, nullptr, count_para,
nullptr) < 0) {
throw sls::RuntimeError(
"Could not select hyperslab for parameters");
}
// source file name
std::ostringstream os;
os << *filePath << "/" << *fileNamePrefix << "_d"
<< (*detIndex * (*numUnitsPerDetector) + i) << "_f" << j
<< '_' << *fileIndex << ".h5";
std::string srcFileName = os.str();
LOG(logDEBUG1) << srcFileName;
// find relative path
std::string relative_srcFileName = srcFileName;
{
size_t i = srcFileName.rfind('/', srcFileName.length());
if (i != std::string::npos)
relative_srcFileName = (srcFileName.substr(
i + 1, srcFileName.length() - i));
}
// source dataset name
std::ostringstream osfn;
osfn << "/data";
if (*numImages > 1)
osfn << "_f" << std::setfill('0') << std::setw(12) << j;
std::string srcDatasetName = osfn.str();
// source dataspace
hsize_t srcdims[3] = {nDimx, nDimy, nDimz};
hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz};
hid_t srcDataspace = H5Screate_simple(3, srcdims, srcdimsmax);
if (srcDataspace < 0)
throw sls::RuntimeError(
"Could not create source dataspace in virtual file " +
vname);
hsize_t srcdims_para[1] = {nDimx};
hsize_t srcdimsmax_para[1] = {H5S_UNLIMITED};
hid_t srcDataspace_para =
H5Screate_simple(1, srcdims_para, srcdimsmax_para);
if (srcDataspace_para < 0)
throw sls::RuntimeError("Could not create source dataspace "
"(parameters) in virtual file " +
vname);
// mapping
if (H5Pset_virtual(dcpl, vdsDataspace,
relative_srcFileName.c_str(),
srcDatasetName.c_str(), srcDataspace) < 0) {
throw sls::RuntimeError(
"Could not set mapping for paramter 1");
}
for (unsigned int k = 0; k < parameterNames.size(); ++k) {
if (H5Pset_virtual(dcpl_para[k], vdsDataspace_para,
relative_srcFileName.c_str(),
parameterNames[k].c_str(),
srcDataspace_para) < 0) {
throw sls::RuntimeError(
"Could not set mapping for paramter " +
std::to_string(k));
}
}
// H5Sclose(srcDataspace);
// H5Sclose(srcDataspace_para);
offset[2] += nDimz;
if (offset[2] >= (numDetz * nDimz)) {
offset[2] = 0;
offset[1] += nDimy;
}
offset_para[1]++;
}
framesSaved += nDimx;
}
// dataset
std::string virtualDatasetName = "data";
hid_t vdsdataset = H5Dcreate2(virtualfd, virtualDatasetName.c_str(),
GetDataTypeinC(datatype), vdsDataspace,
H5P_DEFAULT, dcpl, H5P_DEFAULT);
if (vdsdataset < 0)
throw sls::RuntimeError(
"Could not create virutal dataset in virtual file " + vname);
// virtual parameter dataset
for (unsigned int i = 0; i < parameterNames.size(); ++i) {
hid_t vdsdataset_para = H5Dcreate2(
virtualfd, parameterNames[i].c_str(),
GetDataTypeinC(parameterDataTypes[i]), vdsDataspace_para,
H5P_DEFAULT, dcpl_para[i], H5P_DEFAULT);
if (vdsdataset_para < 0)
throw sls::RuntimeError("Could not create virutal dataset "
"(parameters) in virtual file " +
vname);
}
// close
H5Fclose(virtualfd);
virtualfd = 0;
// link
LinkVirtualInMaster(vname, virtualDatasetName);
} catch (const sls::RuntimeError &e) {
if (virtualfd > 0)
H5Fclose(virtualfd);
virtualfd = 0;
}
}
void HDF5File::LinkVirtualInMaster(std::string fname, std::string dsetname) {
if (fname == currentFileName) {
std::lock_guard<std::mutex> lock(HDF5File::hdf5Lib);
}
char linkname[100];
hid_t vfd = 0;
try {
hid_t dfal = H5Pcreate(H5P_FILE_ACCESS);
if (dfal < 0)
throw sls::RuntimeError(
"Could not create file access property for link");
if (H5Pset_fclose_degree(dfal, H5F_CLOSE_STRONG) < 0)
throw sls::RuntimeError(
"Could not set strong file close degree for link");
// open master file
hid_t mfd = H5Fopen(masterFileName.c_str(), H5F_ACC_RDWR, dfal);
if (mfd < 0)
throw sls::RuntimeError("Could not open master file");
// open virtual file
vfd = H5Fopen(fname.c_str(), H5F_ACC_RDWR, dfal);
if (vfd < 0) {
H5Fclose(mfd);
mfd = 0;
throw sls::RuntimeError("Could not open virtual file");
}
// find relative path
std::string relative_virtualfname = fname;
{
size_t i = fname.rfind('/', fname.length());
if (i != std::string::npos)
relative_virtualfname =
(fname.substr(i + 1, fname.length() - i));
}
//**data dataset**
hid_t vdset = H5Dopen2(vfd, dsetname.c_str(), H5P_DEFAULT);
if (vdset < 0) {
H5Fclose(mfd);
throw sls::RuntimeError("Could not open virtual data dataset");
}
sprintf(linkname, "/entry/data/%s", dsetname.c_str());
if (H5Lcreate_external(relative_virtualfname.c_str(), dsetname.c_str(),
mfd, linkname, H5P_DEFAULT, H5P_DEFAULT) < 0) {
H5Fclose(mfd);
mfd = 0;
throw sls::RuntimeError("Could not create link to data dataset");
}
H5Dclose(vdset);
//**paramter datasets**
for (unsigned int i = 0; i < parameterNames.size(); ++i) {
hid_t vdset_para = H5Dopen2(
vfd, (std::string(parameterNames[i])).c_str(), H5P_DEFAULT);
if (vdset_para < 0) {
H5Fclose(mfd);
mfd = 0;
throw sls::RuntimeError(
"Could not open virtual parameter dataset to create link");
}
sprintf(linkname, "/entry/data/%s",
(std::string(parameterNames[i])).c_str());
if (H5Lcreate_external(relative_virtualfname.c_str(),
parameterNames[i].c_str(), mfd, linkname,
H5P_DEFAULT, H5P_DEFAULT) < 0) {
H5Fclose(mfd);
mfd = 0;
throw sls::RuntimeError(
"Could not create link to virtual parameter dataset");
}
}
H5Fclose(mfd);
mfd = 0;
H5Fclose(vfd);
vfd = 0;
} catch (...) {
if (vfd > 0)
H5Fclose(vfd);
vfd = 0;
}
}
hid_t HDF5File::GetDataTypeinC(DataType dtype) {
if (dtype == PredType::STD_U8LE)
return H5T_STD_U8LE;
else if (dtype == PredType::STD_U16LE)
return H5T_STD_U16LE;
else if (dtype == PredType::STD_U32LE)
return H5T_STD_U32LE;
else if (dtype == PredType::STD_U64LE)
return H5T_STD_U64LE;
else {
hid_t s = H5Tcopy(H5T_C_S1);
H5Tset_size(s, MAX_NUM_PACKETS);
return s;
}
}

View File

@ -0,0 +1,92 @@
#pragma once
/************************************************
* @file HDF5File.h
* @short sets/gets properties for the HDF5 file,
* creates/closes the file and writes data to it
***********************************************/
/**
*@short sets/gets properties for the HDF5 file, creates/closes the file and
*writes data to it
*/
#include "File.h"
#include "H5Cpp.h"
#ifndef H5_NO_NAMESPACE
using namespace H5;
#endif
#include <mutex>
class HDF5File : private virtual slsDetectorDefs, public File {
public:
/**
* Constructor
* creates the File Writer
* @param ind self index
* @param maxf pointer to max frames per file
* @param nd pointer to number of detectors in each dimension
* @param fname pointer to file name prefix
* @param fpath pointer to file path
* @param findex pointer to file index
* @param owenable pointer to over write enable
* @param dindex pointer to detector index
* @param nunits pointer to number of theads/ units per detector
* @param nf pointer to number of images in acquisition
* @param dr pointer to dynamic range
* @param portno pointer to udp port number for logging
* @param nx number of pixels in x direction
* @param ny number of pixels in y direction
* @param smode pointer to silent mode
*/
HDF5File(int ind, uint32_t *maxf, int *nd, std::string *fname,
std::string *fpath, uint64_t *findex, bool *owenable, int *dindex,
int *nunits, uint64_t *nf, uint32_t *dr, uint32_t *portno,
uint32_t nx, uint32_t ny, bool *smode);
~HDF5File();
void SetNumberofPixels(uint32_t nx, uint32_t ny);
void CreateFile() override;
void CloseAllFiles() override;
void CloseCurrentDataFile() override;
void CloseMasterFile() override;
void WriteToFile(char *buffer, int bufferSize, uint64_t currentFrameNumber,
uint32_t numPacketsCaught) override;
void CreateMasterFile(MasterAttributes *attr) override;
void StartofAcquisition() override;
void EndofAcquisition(bool anyPacketsCaught, uint64_t numImagesCaught);
private:
void CloseFile(H5File *&fd, bool masterFile);
void WriteDataFile(uint64_t currentFrameNumber, char *buffer);
void WriteParameterDatasets(uint64_t currentFrameNumber,
sls_receiver_header *rheader);
void ExtendDataset();
void CreateDataFile();
void CreateMasterDataFile(MasterAttributes *attr);
void CreateVirtualDataFile(uint32_t maxFramesPerFile, uint64_t numf);
void LinkVirtualInMaster(std::string fname, std::string dsetname);
hid_t GetDataTypeinC(DataType dtype);
static std::mutex hdf5Lib;
H5File *masterfd;
/** Virtual File handle ( only file name because
code in C as H5Pset_virtual doesnt exist yet in C++) */
hid_t virtualfd;
H5File *filefd;
DataSpace *dataspace;
DataSet *dataset;
DataType datatype;
uint32_t nPixelsX;
uint32_t nPixelsY;
uint32_t numFramesInFile;
int numFilesinAcquisition;
std::vector<std::string> parameterNames;
std::vector<DataType> parameterDataTypes;
DataSpace *dataspace_para;
std::vector<DataSet *> dataset_para;
uint64_t extNumImages;
};

File diff suppressed because it is too large Load Diff