mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-06-23 01:58:00 +02:00
wip
This commit is contained in:
@ -223,6 +223,8 @@ int ClientInterface::functionTable(){
|
||||
flist[F_RECEIVER_GET_RECEIVER_ROI] = &ClientInterface::get_receiver_roi;
|
||||
flist[F_RECEIVER_SET_RECEIVER_ROI] = &ClientInterface::set_receiver_roi;
|
||||
flist[F_RECEIVER_SET_RECEIVER_ROI_METADATA] = &ClientInterface::set_receiver_roi_metadata;
|
||||
flist[F_GET_RECEIVER_BUNCH_SIZE] = &ClientInterface::get_bunch_size;
|
||||
flist[F_SET_RECEIVER_BUNCH_SIZE] = &ClientInterface::set_bunch_size;
|
||||
|
||||
for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) {
|
||||
LOG(logDEBUG1) << "function fnum: " << i << " (" <<
|
||||
@ -1176,7 +1178,7 @@ int ClientInterface::get_additional_json_header(Interface &socket) {
|
||||
int ClientInterface::set_udp_socket_buffer_size(Interface &socket) {
|
||||
auto size = socket.Receive<int>();
|
||||
if (size == 0) {
|
||||
throw RuntimeError("Receiver socket buffer size must be > 0.");
|
||||
throw RuntimeError("Receiver socket buffer size must be greater than 0.");
|
||||
}
|
||||
if (size > 0) {
|
||||
verifyIdle(socket);
|
||||
@ -1780,4 +1782,29 @@ int ClientInterface::set_receiver_roi_metadata(Interface &socket) {
|
||||
return socket.Send(OK);
|
||||
}
|
||||
|
||||
int ClientInterface::get_bunch_size(Interface &socket) {
|
||||
int retval = impl()->getBunchSize();
|
||||
LOG(logDEBUG1) << "bunch size retval:" << retval;
|
||||
return socket.sendResult(retval);
|
||||
}
|
||||
|
||||
int ClientInterface::set_bunch_size(Interface &socket) {
|
||||
auto value = socket.Receive<int>();
|
||||
if (value <= 0) {
|
||||
throw RuntimeError("Could not set rx bunch size. Must be greater than 0.");
|
||||
}
|
||||
verifyIdle(socket);
|
||||
LOG(logDEBUG1) << "Setting bunch size:" << value;
|
||||
try {
|
||||
impl()->setBunchSize(value);
|
||||
} catch (const RuntimeError &e) {
|
||||
throw RuntimeError("Could not set rx bunch size due to fifo structure memory allocation.");
|
||||
}
|
||||
|
||||
int retval = impl()->getBunchSize();
|
||||
validate(value, retval, std::string("set bunch size"), DEC);
|
||||
LOG(logDEBUG1) << "bunch size retval:" << retval;
|
||||
return socket.sendResult(retval);
|
||||
}
|
||||
|
||||
} // namespace sls
|
||||
|
@ -170,6 +170,9 @@ class ClientInterface : private virtual slsDetectorDefs {
|
||||
int get_receiver_roi(ServerInterface &socket);
|
||||
int set_receiver_roi(ServerInterface &socket);
|
||||
int set_receiver_roi_metadata(ServerInterface &socket);
|
||||
int get_bunch_size(ServerInterface &socket);
|
||||
int set_bunch_size(ServerInterface &socket);
|
||||
|
||||
|
||||
Implementation *impl() {
|
||||
if (receiver != nullptr) {
|
||||
|
@ -44,6 +44,7 @@ DataProcessor::DataProcessor(int index, detectorType detectorType, Fifo *fifo,
|
||||
ctbAnalogDataBytes_(ctbAnalogDataBytes) {
|
||||
|
||||
LOG(logDEBUG) << "DataProcessor " << index << " created";
|
||||
vetoThread = (detectorType_ == GOTTHARD2 && index != 0);
|
||||
}
|
||||
|
||||
DataProcessor::~DataProcessor() { DeleteFiles(); }
|
||||
@ -59,6 +60,10 @@ void DataProcessor::SetReceiverROI(ROI roi) {
|
||||
receiverRoiEnabled_ = receiverRoi_.completeRoi() ? false : true;
|
||||
}
|
||||
|
||||
void DataProcessor::SetBunchSize(uint32_t value) {
|
||||
fifoBunchSize = value;
|
||||
}
|
||||
|
||||
void DataProcessor::ResetParametersforNewAcquisition() {
|
||||
StopRunning();
|
||||
startedFlag_ = false;
|
||||
@ -68,6 +73,12 @@ void DataProcessor::ResetParametersforNewAcquisition() {
|
||||
firstStreamerFrame_ = true;
|
||||
streamCurrentFrame_ = false;
|
||||
completeImageToStreamBeforeCropping = make_unique<char[]>(generalData_->imageSize);
|
||||
|
||||
fifoBunchSizeBytes = generalData_->imageSize;
|
||||
if (vetoThread) {
|
||||
fifoBunchSizeBytes = generalData_->vetoDataSize;
|
||||
}
|
||||
fifoBunchSizeBytes += generalData_->fifoBufferHeaderSize;
|
||||
}
|
||||
|
||||
void DataProcessor::RecordFirstIndex(uint64_t fnum) {
|
||||
@ -252,24 +263,30 @@ std::string DataProcessor::CreateMasterFile(
|
||||
void DataProcessor::ThreadExecution() {
|
||||
char *buffer = nullptr;
|
||||
fifo_->PopAddress(buffer);
|
||||
LOG(logDEBUG5) << "DataProcessor " << index << ", " << std::hex
|
||||
<< static_cast<void *>(buffer) << std::dec << ":" << buffer;
|
||||
LOG(logINFOBLUE) << "DataProcessor " << index << ", " << std::hex
|
||||
<< static_cast<void *>(buffer) << std::dec;
|
||||
|
||||
// check dummy
|
||||
auto numBytes = *reinterpret_cast<uint32_t *>(buffer);
|
||||
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes;
|
||||
if (numBytes == DUMMY_PACKET_VALUE) {
|
||||
StopProcessing(buffer);
|
||||
return;
|
||||
char* tempBuffer = buffer;
|
||||
for (uint32_t iFrame = 0; iFrame != fifoBunchSize; iFrame ++) {
|
||||
|
||||
// end of acquisition (check dummy)
|
||||
auto numBytes = *reinterpret_cast<uint32_t *>(tempBuffer);
|
||||
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes;
|
||||
if (numBytes == DUMMY_PACKET_VALUE) {
|
||||
StopProcessing(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
ProcessAnImage(tempBuffer);
|
||||
}
|
||||
// exception from callback
|
||||
catch (const std::exception &e) {
|
||||
;
|
||||
}
|
||||
tempBuffer += fifoBunchSizeBytes;
|
||||
}
|
||||
|
||||
try {
|
||||
ProcessAnImage(buffer);
|
||||
} catch (const std::exception &e) {
|
||||
fifo_->FreeAddress(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// stream (if time/freq to stream) or free
|
||||
if (streamCurrentFrame_) {
|
||||
// copy the complete image back if roi enabled
|
||||
@ -284,9 +301,9 @@ void DataProcessor::ThreadExecution() {
|
||||
}
|
||||
|
||||
void DataProcessor::StopProcessing(char *buf) {
|
||||
LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
|
||||
LOG(logINFORED) << "DataProcessing " << index << ": Dummy";
|
||||
|
||||
// stream or free
|
||||
// stream dummy or free
|
||||
if (*dataStreamEnable_)
|
||||
fifo_->PushAddressToStream(buf);
|
||||
else
|
||||
@ -428,11 +445,11 @@ bool DataProcessor::CheckTimer() {
|
||||
}
|
||||
|
||||
bool DataProcessor::CheckCount() {
|
||||
if (currentFreqCount_ == *streamingFrequency_) {
|
||||
if (currentFreqCount_ >= *streamingFrequency_) {
|
||||
currentFreqCount_ = 1;
|
||||
return true;
|
||||
}
|
||||
currentFreqCount_++;
|
||||
currentFreqCount_ += fifoBunchSize;
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -458,7 +475,7 @@ void DataProcessor::PadMissingPackets(char *buf) {
|
||||
sls_bitset pmask = header->packetsMask;
|
||||
|
||||
uint32_t dsize = generalData_->dataSize;
|
||||
if (detectorType_ == GOTTHARD2 && index != 0) {
|
||||
if (vetoThread) {
|
||||
dsize = generalData_->vetoDataSize;
|
||||
}
|
||||
uint32_t fifohsize = generalData_->fifoBufferHeaderSize;
|
||||
|
@ -44,6 +44,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
|
||||
void SetReceiverROI(ROI roi);
|
||||
void ResetParametersforNewAcquisition();
|
||||
void SetGeneralData(GeneralData *generalData);
|
||||
void SetBunchSize(uint32_t value);
|
||||
|
||||
void CloseFiles();
|
||||
void DeleteFiles();
|
||||
@ -157,6 +158,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
|
||||
bool activated_{false};
|
||||
ROI receiverRoi_{};
|
||||
bool receiverRoiEnabled_{false};
|
||||
bool vetoThread{false};
|
||||
|
||||
std::unique_ptr<char[]> completeImageToStreamBeforeCropping;
|
||||
/** if 0, sending random images with a timer */
|
||||
uint32_t *streamingFrequency_;
|
||||
@ -184,6 +187,10 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
|
||||
|
||||
File *dataFile_{nullptr};
|
||||
|
||||
uint32_t fifoBunchSize{0};
|
||||
/** size in memory including headers */
|
||||
uint32_t fifoBunchSizeBytes{0};
|
||||
|
||||
// call back
|
||||
/**
|
||||
* Call back for raw data
|
||||
|
@ -18,14 +18,15 @@ namespace sls {
|
||||
|
||||
const std::string DataStreamer::TypeName = "DataStreamer";
|
||||
|
||||
DataStreamer::DataStreamer(int ind, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi,
|
||||
DataStreamer::DataStreamer(int ind, detectorType dType, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi,
|
||||
bool fr, slsDetectorDefs::xy np, bool *qe,
|
||||
uint64_t *tot)
|
||||
: ThreadObject(ind, TypeName), fifo(f), dynamicRange(dr), detectorRoi(r),
|
||||
: ThreadObject(ind, TypeName), fifo(f), detType(dType), dynamicRange(dr), detectorRoi(r),
|
||||
fileIndex(fi), flipRows(fr), numPorts(np), quadEnable(qe),
|
||||
totalNumFrames(tot) {
|
||||
|
||||
LOG(logDEBUG) << "DataStreamer " << ind << " created";
|
||||
vetoThread = (detType == GOTTHARD2 && index != 0);
|
||||
}
|
||||
|
||||
DataStreamer::~DataStreamer() {
|
||||
@ -50,6 +51,11 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) {
|
||||
completeBuffer = new char[generalData->imageSizeComplete];
|
||||
memset(completeBuffer, 0, generalData->imageSizeComplete);
|
||||
}
|
||||
fifoBunchSizeBytes = generalData->imageSize;
|
||||
if (vetoThread) {
|
||||
fifoBunchSizeBytes = generalData->vetoDataSize;
|
||||
}
|
||||
fifoBunchSizeBytes += generalData->fifoBufferHeaderSize;
|
||||
}
|
||||
|
||||
void DataStreamer::RecordFirstIndex(uint64_t fnum, char *buf) {
|
||||
@ -75,6 +81,10 @@ void DataStreamer::SetAdditionalJsonHeader(
|
||||
isAdditionalJsonUpdated = true;
|
||||
}
|
||||
|
||||
void DataStreamer::SetBunchSize(uint32_t value) {
|
||||
fifoBunchSize = value;
|
||||
}
|
||||
|
||||
void DataStreamer::CreateZmqSockets(int *nunits, uint32_t port,
|
||||
const IpAddr ip, int hwm) {
|
||||
uint32_t portnum = port + index;
|
||||
@ -115,15 +125,20 @@ void DataStreamer::ThreadExecution() {
|
||||
"pop 0x"
|
||||
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
|
||||
|
||||
// check dummy
|
||||
auto numBytes = *reinterpret_cast<uint32_t *>(buffer);
|
||||
LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes;
|
||||
if (numBytes == DUMMY_PACKET_VALUE) {
|
||||
StopProcessing(buffer);
|
||||
return;
|
||||
}
|
||||
char* tempBuffer = buffer;
|
||||
for (uint32_t iFrame = 0; iFrame != fifoBunchSize; iFrame ++) {
|
||||
|
||||
ProcessAnImage(buffer);
|
||||
// end of acquisition (check dummy)
|
||||
auto numBytes = *reinterpret_cast<uint32_t *>(tempBuffer);
|
||||
LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes;
|
||||
if (numBytes == DUMMY_PACKET_VALUE) {
|
||||
StopProcessing(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
ProcessAnImage(tempBuffer);
|
||||
tempBuffer += fifoBunchSizeBytes;
|
||||
}
|
||||
|
||||
// free
|
||||
fifo->FreeAddress(buffer);
|
||||
|
@ -30,6 +30,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
|
||||
* Calls Base Class CreateThread(), sets ErrorMask if error and increments
|
||||
* NumberofDataStreamers
|
||||
* @param ind self index
|
||||
* @param dType detector type
|
||||
* @param f address of Fifo pointer
|
||||
* @param dr pointer to dynamic range
|
||||
* @param r detectorRoi
|
||||
@ -39,7 +40,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
|
||||
* @param qe pointer to quad Enable
|
||||
* @param tot pointer to total number of frames
|
||||
*/
|
||||
DataStreamer(int ind, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi, bool fr,
|
||||
DataStreamer(int ind, detectorType dType, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi, bool fr,
|
||||
xy np, bool *qe, uint64_t *tot);
|
||||
|
||||
/**
|
||||
@ -55,6 +56,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
|
||||
void SetFlipRows(bool fd);
|
||||
void
|
||||
SetAdditionalJsonHeader(const std::map<std::string, std::string> &json);
|
||||
void SetBunchSize(uint32_t value);
|
||||
|
||||
/**
|
||||
* Creates Zmq Sockets
|
||||
@ -105,6 +107,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
|
||||
static const std::string TypeName;
|
||||
const GeneralData *generalData{nullptr};
|
||||
Fifo *fifo;
|
||||
detectorType detType;
|
||||
ZmqSocket *zmqSocket{nullptr};
|
||||
uint32_t *dynamicRange;
|
||||
ROI *detectorRoi;
|
||||
@ -112,6 +115,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
|
||||
uint64_t *fileIndex;
|
||||
bool flipRows;
|
||||
std::map<std::string, std::string> additionalJsonHeader;
|
||||
bool vetoThread{false};
|
||||
|
||||
/** Used by streamer thread to update local copy (reduce number of locks
|
||||
* during streaming) */
|
||||
@ -132,6 +136,10 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
|
||||
xy numPorts{1, 1};
|
||||
bool *quadEnable;
|
||||
uint64_t *totalNumFrames;
|
||||
|
||||
uint32_t fifoBunchSize{0};
|
||||
/** size in memory including headers */
|
||||
uint32_t fifoBunchSizeBytes{0};
|
||||
};
|
||||
|
||||
} // namespace sls
|
||||
|
@ -65,6 +65,7 @@ class GeneralData {
|
||||
uint32_t adcEnableMaskTenGiga{BIT32_MASK};
|
||||
slsDetectorDefs::ROI roi{};
|
||||
uint32_t counterMask{0};
|
||||
uint32_t defaultBunchSize{1};
|
||||
|
||||
GeneralData(){};
|
||||
virtual ~GeneralData(){};
|
||||
@ -445,10 +446,11 @@ class Gotthard2Data : public GeneralData {
|
||||
maxFramesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE;
|
||||
fifoBufferHeaderSize =
|
||||
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
|
||||
defaultFifoDepth = 50000;
|
||||
defaultFifoDepth = 50;
|
||||
standardheader = true;
|
||||
vetoDataSize = 160;
|
||||
vetoHsize = 16;
|
||||
defaultBunchSize = 10000;
|
||||
UpdateImageSize();
|
||||
};
|
||||
|
||||
|
@ -67,16 +67,17 @@ void Implementation::SetThreadPriorities() {
|
||||
void Implementation::SetupFifoStructure() {
|
||||
fifo.clear();
|
||||
for (int i = 0; i < numUDPInterfaces; ++i) {
|
||||
uint32_t datasize = generalData->imageSize;
|
||||
size_t datasize = generalData->imageSize;
|
||||
// veto data size
|
||||
if (detType == GOTTHARD2 && i != 0) {
|
||||
datasize = generalData->vetoImageSize;
|
||||
}
|
||||
datasize += generalData->fifoBufferHeaderSize;
|
||||
datasize *= bunchSize;
|
||||
|
||||
// create fifo structure
|
||||
try {
|
||||
fifo.push_back(sls::make_unique<Fifo>(
|
||||
i, datasize + (generalData->fifoBufferHeaderSize), fifoDepth));
|
||||
fifo.push_back(sls::make_unique<Fifo>(i, datasize, fifoDepth));
|
||||
} catch (...) {
|
||||
fifo.clear();
|
||||
fifoDepth = 0;
|
||||
@ -93,9 +94,7 @@ void Implementation::SetupFifoStructure() {
|
||||
dataStreamer[i]->SetFifo(fifo[i].get());
|
||||
|
||||
LOG(logINFO) << "Memory Allocated for Fifo " << i << ": "
|
||||
<< (double)(((size_t)(datasize) +
|
||||
(size_t)(generalData->fifoBufferHeaderSize)) *
|
||||
(size_t)fifoDepth) /
|
||||
<< (double)(datasize * (size_t)fifoDepth) /
|
||||
(double)(1024 * 1024)
|
||||
<< " MB";
|
||||
}
|
||||
@ -169,6 +168,7 @@ void Implementation::setDetectorType(const detectorType d) {
|
||||
adcEnableMaskTenGiga = generalData->adcEnableMaskTenGiga;
|
||||
detectorRoi = generalData->roi;
|
||||
counterMask = generalData->counterMask;
|
||||
bunchSize = generalData->defaultBunchSize;
|
||||
|
||||
SetLocalNetworkParameters();
|
||||
SetupFifoStructure();
|
||||
@ -204,10 +204,12 @@ void Implementation::setDetectorType(const detectorType d) {
|
||||
for (const auto &it : listener) {
|
||||
it->SetGeneralData(generalData);
|
||||
it->SetActivate(activated);
|
||||
it->SetBunchSize(bunchSize);
|
||||
}
|
||||
for (const auto &it : dataProcessor) {
|
||||
it->SetGeneralData(generalData);
|
||||
it->SetActivate(activated);
|
||||
it->SetBunchSize(bunchSize);
|
||||
}
|
||||
SetThreadPriorities();
|
||||
|
||||
@ -453,6 +455,22 @@ void Implementation::setReceiverROIMetadata(const ROI arg) {
|
||||
LOG(logINFO) << "receiver roi Metadata: " << ToString(receiverRoiMetadata);
|
||||
}
|
||||
|
||||
uint32_t Implementation::getBunchSize() const { return bunchSize; }
|
||||
|
||||
void Implementation::setBunchSize(const uint32_t i) {
|
||||
if (bunchSize != i) {
|
||||
bunchSize = i;
|
||||
for (const auto &it : listener)
|
||||
it->SetBunchSize(i);
|
||||
for (const auto &it : dataProcessor)
|
||||
it->SetBunchSize(i);
|
||||
for (const auto &it : dataStreamer)
|
||||
it->SetBunchSize(i);
|
||||
SetupFifoStructure();
|
||||
}
|
||||
LOG(logINFO) << "Fifo Bunch Size: " << i;
|
||||
}
|
||||
|
||||
/**************************************************
|
||||
* *
|
||||
* File Parameters *
|
||||
@ -677,10 +695,11 @@ void Implementation::stopReceiver() {
|
||||
for (const auto &it : listener)
|
||||
if (it->IsRunning())
|
||||
running = true;
|
||||
|
||||
LOG(logINFOBLUE) << "listener done";
|
||||
for (const auto &it : dataProcessor)
|
||||
if (it->IsRunning())
|
||||
running = true;
|
||||
LOG(logINFOBLUE) << "processor done";
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
}
|
||||
|
||||
@ -1003,6 +1022,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
|
||||
&silentMode));
|
||||
listener[i]->SetGeneralData(generalData);
|
||||
listener[i]->SetActivate(activated);
|
||||
listener[i]->SetBunchSize(bunchSize);
|
||||
|
||||
int ctbAnalogDataBytes = 0;
|
||||
if (detType == CHIPTESTBOARD) {
|
||||
@ -1017,6 +1037,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
|
||||
dataProcessor[i]->SetGeneralData(generalData);
|
||||
dataProcessor[i]->SetActivate(activated);
|
||||
dataProcessor[i]->SetReceiverROI(portRois[i]);
|
||||
dataProcessor[i]->SetBunchSize(bunchSize);
|
||||
} catch (...) {
|
||||
listener.clear();
|
||||
dataProcessor.clear();
|
||||
@ -1032,7 +1053,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
|
||||
flip = (i == 1 ? true : false);
|
||||
}
|
||||
dataStreamer.push_back(sls::make_unique<DataStreamer>(
|
||||
i, fifo[i].get(), &dynamicRange, &detectorRoi,
|
||||
i, detType, fifo[i].get(), &dynamicRange, &detectorRoi,
|
||||
&fileIndex, flip, numPorts, &quadEnable,
|
||||
&numberOfTotalFrames));
|
||||
dataStreamer[i]->SetGeneralData(generalData);
|
||||
@ -1041,6 +1062,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
|
||||
streamingHwm);
|
||||
dataStreamer[i]->SetAdditionalJsonHeader(
|
||||
additionalJsonHeader);
|
||||
dataStreamer[i]->SetBunchSize(bunchSize);
|
||||
} catch (...) {
|
||||
if (dataStreamEnable) {
|
||||
dataStreamer.clear();
|
||||
@ -1159,7 +1181,7 @@ void Implementation::setDataStreamEnable(const bool enable) {
|
||||
flip = (i == 1 ? true : false);
|
||||
}
|
||||
dataStreamer.push_back(sls::make_unique<DataStreamer>(
|
||||
i, fifo[i].get(), &dynamicRange, &detectorRoi,
|
||||
i, detType, fifo[i].get(), &dynamicRange, &detectorRoi,
|
||||
&fileIndex, flip, numPorts, &quadEnable,
|
||||
&numberOfTotalFrames));
|
||||
dataStreamer[i]->SetGeneralData(generalData);
|
||||
@ -1168,6 +1190,7 @@ void Implementation::setDataStreamEnable(const bool enable) {
|
||||
streamingHwm);
|
||||
dataStreamer[i]->SetAdditionalJsonHeader(
|
||||
additionalJsonHeader);
|
||||
dataStreamer[i]->SetBunchSize(bunchSize);
|
||||
} catch (...) {
|
||||
dataStreamer.clear();
|
||||
dataStreamEnable = false;
|
||||
|
@ -59,6 +59,8 @@ class Implementation : private virtual slsDetectorDefs {
|
||||
ROI getReceiverROI() const;
|
||||
void setReceiverROI(const ROI arg);
|
||||
void setReceiverROIMetadata(const ROI arg);
|
||||
uint32_t getBunchSize() const;
|
||||
void setBunchSize(const uint32_t i);
|
||||
|
||||
/**************************************************
|
||||
* *
|
||||
@ -311,6 +313,7 @@ class Implementation : private virtual slsDetectorDefs {
|
||||
std::array<ROI, 2> portRois{};
|
||||
// receiver roi for complete detector for metadata
|
||||
ROI receiverRoiMetadata{};
|
||||
uint32_t bunchSize{0};
|
||||
|
||||
// file parameters
|
||||
fileFormat fileFormatType{BINARY};
|
||||
|
@ -32,6 +32,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo *f,
|
||||
actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp),
|
||||
detectorDataStream(detds), silentMode(sm) {
|
||||
LOG(logDEBUG) << "Listener " << ind << " created";
|
||||
vetoThread = (myDetectorType == GOTTHARD2 && index != 0);
|
||||
}
|
||||
|
||||
Listener::~Listener() = default;
|
||||
@ -85,7 +86,7 @@ void Listener::ResetParametersforNewAcquisition() {
|
||||
lastCaughtFrameIndex = 0;
|
||||
carryOverFlag = false;
|
||||
uint32_t packetSize = generalData->packetSize;
|
||||
if (myDetectorType == GOTTHARD2 && index != 0) {
|
||||
if (vetoThread) {
|
||||
packetSize = generalData->vetoPacketSize;
|
||||
}
|
||||
carryOverPacket = make_unique<char[]>(packetSize);
|
||||
@ -98,6 +99,12 @@ void Listener::ResetParametersforNewAcquisition() {
|
||||
// reset fifo statistic
|
||||
fifo->GetMaxLevelForFifoBound();
|
||||
fifo->GetMinLevelForFifoFree();
|
||||
|
||||
fifoBunchSizeBytes = generalData->imageSize;
|
||||
if (vetoThread) {
|
||||
fifoBunchSizeBytes = generalData->vetoDataSize;
|
||||
}
|
||||
fifoBunchSizeBytes += generalData->fifoBufferHeaderSize;
|
||||
}
|
||||
|
||||
void Listener::RecordFirstIndex(uint64_t fnum) {
|
||||
@ -119,6 +126,10 @@ void Listener::SetGeneralData(GeneralData *g) { generalData = g; }
|
||||
|
||||
void Listener::SetActivate(bool enable) { activated = enable; }
|
||||
|
||||
void Listener::SetBunchSize(uint32_t value) {
|
||||
fifoBunchSize = value;
|
||||
}
|
||||
|
||||
void Listener::CreateUDPSockets() {
|
||||
if (!activated || !(*detectorDataStream)) {
|
||||
return;
|
||||
@ -135,7 +146,7 @@ void Listener::CreateUDPSockets() {
|
||||
ShutDownUDPSocket();
|
||||
|
||||
uint32_t packetSize = generalData->packetSize;
|
||||
if (myDetectorType == GOTTHARD2 && index != 0) {
|
||||
if (vetoThread) {
|
||||
packetSize = generalData->vetoPacketSize;
|
||||
}
|
||||
|
||||
@ -184,7 +195,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int s) {
|
||||
}
|
||||
|
||||
uint32_t packetSize = generalData->packetSize;
|
||||
if (myDetectorType == GOTTHARD2 && index != 0) {
|
||||
if (vetoThread) {
|
||||
packetSize = generalData->vetoPacketSize;
|
||||
}
|
||||
|
||||
@ -220,51 +231,48 @@ void Listener::SetHardCodedPosition(uint16_t r, uint16_t c) {
|
||||
|
||||
void Listener::ThreadExecution() {
|
||||
char *buffer;
|
||||
int rc = 0;
|
||||
|
||||
fifo->GetNewAddress(buffer);
|
||||
LOG(logDEBUG5) << "Listener " << index
|
||||
<< ", "
|
||||
"pop 0x"
|
||||
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
|
||||
|
||||
// udpsocket doesnt exist
|
||||
if (activated && *detectorDataStream && !udpSocketAlive && !carryOverFlag) {
|
||||
// LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not
|
||||
// created or shut down earlier";
|
||||
(*((uint32_t *)buffer)) = 0;
|
||||
// get data
|
||||
char* tempBuffer = buffer;
|
||||
for (uint32_t iFrame = 0; iFrame != fifoBunchSize; iFrame ++) {
|
||||
|
||||
// end of acquisition or not activated
|
||||
if ((*status == TRANSMITTING || !udpSocketAlive) && !carryOverFlag) {
|
||||
(*((uint32_t *)tempBuffer)) = DUMMY_PACKET_VALUE;
|
||||
StopListening(buffer);
|
||||
return;
|
||||
}
|
||||
LOG(logDEBUG) << "iframe:" << iFrame << " currentframeindex:" << currentFrameIndex;
|
||||
int rc = ListenToAnImage(tempBuffer);
|
||||
|
||||
// socket closed or discarding image (free retake)
|
||||
// weird frame numbers (print and rc = 0), then retake
|
||||
if (rc <= 0) {
|
||||
if (udpSocketAlive) {
|
||||
--iFrame;
|
||||
}
|
||||
} else {
|
||||
(*((uint32_t *)tempBuffer)) = rc;
|
||||
tempBuffer += fifoBunchSizeBytes;
|
||||
}
|
||||
}
|
||||
|
||||
// last check
|
||||
if ((*status != TRANSMITTING || !udpSocketAlive) && !carryOverFlag) {
|
||||
LOG(logINFOBLUE) << "Last check ";
|
||||
(*((uint32_t *)tempBuffer)) = DUMMY_PACKET_VALUE;
|
||||
StopListening(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
// get data
|
||||
if ((*status != TRANSMITTING &&
|
||||
(!activated || !(*detectorDataStream) || udpSocketAlive)) ||
|
||||
carryOverFlag) {
|
||||
rc = ListenToAnImage(buffer);
|
||||
}
|
||||
|
||||
// error check, (should not be here) if not transmitting yet (previous if)
|
||||
// rc should be > 0
|
||||
if (rc == 0) {
|
||||
if (!udpSocketAlive) {
|
||||
(*((uint32_t *)buffer)) = 0;
|
||||
StopListening(buffer);
|
||||
} else
|
||||
fifo->FreeAddress(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
// discarding image
|
||||
else if (rc < 0) {
|
||||
fifo->FreeAddress(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
(*((uint32_t *)buffer)) = rc;
|
||||
|
||||
// push into fifo
|
||||
fifo->PushAddress(buffer);
|
||||
LOG(logINFOBLUE) << "Pushed Listening bunch " << (void*)(buffer);
|
||||
|
||||
// Statistics
|
||||
if (!(*silentMode)) {
|
||||
@ -278,8 +286,8 @@ void Listener::ThreadExecution() {
|
||||
}
|
||||
|
||||
void Listener::StopListening(char *buf) {
|
||||
(*((uint32_t *)buf)) = DUMMY_PACKET_VALUE;
|
||||
fifo->PushAddress(buf);
|
||||
LOG(logINFOBLUE) << "Pushed Listening bunch (EOA) " << (void*)(buf);
|
||||
StopRunning();
|
||||
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber
|
||||
<< ") : " << numPacketsCaught;
|
||||
@ -300,7 +308,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
|
||||
uint32_t hsize = generalData->headerSizeinPacket;
|
||||
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
|
||||
bool standardheader = generalData->standardheader;
|
||||
if (myDetectorType == GOTTHARD2 && index != 0) {
|
||||
if (vetoThread) {
|
||||
dsize = generalData->vetoDataSize;
|
||||
imageSize = generalData->vetoImageSize;
|
||||
packetSize = generalData->vetoPacketSize;
|
||||
@ -317,15 +325,6 @@ uint32_t Listener::ListenToAnImage(char *buf) {
|
||||
memset(buf, 0, fifohsize);
|
||||
new_header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
|
||||
|
||||
// deactivated port (eiger)
|
||||
if (!(*detectorDataStream)) {
|
||||
return 0;
|
||||
}
|
||||
// deactivated (eiger)
|
||||
if (!activated) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// look for carry over
|
||||
if (carryOverFlag) {
|
||||
LOG(logDEBUG3) << index << "carry flag";
|
||||
@ -350,6 +349,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
|
||||
<< "(Weird), With carry flag: Frame number " << fnum
|
||||
<< " less than current frame number " << currentFrameIndex;
|
||||
carryOverFlag = false;
|
||||
exit(-1);//***************************
|
||||
return 0;
|
||||
}
|
||||
switch (*frameDiscardMode) {
|
||||
@ -514,7 +514,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
|
||||
|
||||
lastCaughtFrameIndex = fnum;
|
||||
|
||||
LOG(logDEBUG1) << "Listening " << index
|
||||
LOG(logDEBUG) << "Listening " << index
|
||||
<< ": currentfindex:" << currentFrameIndex
|
||||
<< ", fnum:" << fnum << ", pnum:" << pnum
|
||||
<< ", numpackets:" << numpackets;
|
||||
@ -533,6 +533,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
|
||||
// future packet by looking at image number (all other
|
||||
// detectors)
|
||||
if (fnum != currentFrameIndex) {
|
||||
LOG(logINFORED) << "not equal. fnum:" << fnum << " currentfnum:" << currentFrameIndex;
|
||||
carryOverFlag = true;
|
||||
memcpy(carryOverPacket.get(), &listeningPacket[0], packetSize);
|
||||
|
||||
|
@ -65,6 +65,8 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
void ResetParametersforNewAcquisition();
|
||||
void SetGeneralData(GeneralData *g);
|
||||
void SetActivate(bool enable);
|
||||
void SetBunchSize(uint32_t value);
|
||||
|
||||
void CreateUDPSockets();
|
||||
void ShutDownUDPSocket();
|
||||
|
||||
@ -130,6 +132,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
bool activated{false};
|
||||
bool *detectorDataStream;
|
||||
bool *silentMode;
|
||||
bool vetoThread{false};
|
||||
|
||||
/** row hardcoded as 1D or 2d,
|
||||
* if detector does not send them yet or
|
||||
@ -164,6 +167,10 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
|
||||
std::unique_ptr<char[]> listeningPacket;
|
||||
std::atomic<bool> udpSocketAlive{false};
|
||||
|
||||
uint32_t fifoBunchSize{0};
|
||||
/** size in memory including headers */
|
||||
uint32_t fifoBunchSizeBytes{0};
|
||||
|
||||
// for print progress during acquisition*/
|
||||
uint32_t numPacketsStatistic{0};
|
||||
uint32_t numFramesStatistic{0};
|
||||
|
Reference in New Issue
Block a user