Compare commits

...

13 Commits

Author SHA1 Message Date
6964be2aeb fix gui to stream accordingly for g2 bunch 2022-06-10 00:42:40 +02:00
32e3457e84 merge fix 2022-06-09 13:51:37 +02:00
97a2a3d7c1 Merge branch 'developer' into g2rxr 2022-06-09 13:48:54 +02:00
37a8c61124 Merge branch 'developer' into g2rxr 2022-06-09 12:37:30 +02:00
421c2c161a merge developer 2022-06-09 11:43:58 +02:00
59eea1a1fd adding statistics 2022-06-08 16:55:49 +02:00
2b5470ef59 update client api 2022-06-08 16:40:26 +02:00
b5d02ac398 unnecessary print 2022-06-08 16:39:25 +02:00
7536c16a48 print outs 2022-06-08 10:24:45 +02:00
4fe520fdaf fixed 2022-06-08 09:38:47 +02:00
1cfebf667b stuck when it reachs bunchsize 2022-06-07 17:23:39 +02:00
23e4dde063 Merge branch 'developer' into g2rxr 2022-06-07 17:02:47 +02:00
12c2609978 wip 2022-06-07 15:51:58 +02:00
20 changed files with 285 additions and 103 deletions

View File

@ -79,6 +79,7 @@ This document describes the differences between v7.0.0 and v6.x.x
- g2 and m3 clkdiv 2 (system clock) change should affect time settings (g2: exptime, period, delayaftertrigger, burstperiod, m3: exptime, gatedelay, gateperiod, period, delayaftertrigger)
- g2 system frequency is the same irrespective of timing source
- (apparently) rxr doesnt get stuck anymore from 6.1.1
-rx_bunchsize, (default fifodepth for g2 changed to 50)
- rxr mem size changed (fifo header size from 8 to 16) due to sls rxr header = 112.. 112+ 16=128 (reduces packet losss especially for g2)
-udp_srcip and udp_Srcip2: can set to auto (for virtual or 1g data networks)
- set dataset name for all hdf5 files to "data" only

View File

@ -913,6 +913,15 @@ class Detector {
void clearRxROI();
Result<int> getRxBunchSize(Positions pos = {}) const;
/** Number of frames the receiver listens to before pushing into fifo
* (buffer between listener and writer threads).
* Higher number results in fewer locks between fifo access. \n
* Default is 1. */
void setRxBunchSize(int value, Positions pos = {});
///@}
/** @name File */

View File

@ -911,6 +911,7 @@ class CmdProxy {
{"rx_arping", &CmdProxy::rx_arping},
{"rx_roi", &CmdProxy::Rx_ROI},
{"rx_clearroi", &CmdProxy::rx_clearroi},
{"rx_bunchsize", &CmdProxy::rx_bunchsize},// FIXME: rx_fifobunchsize?
/* File */
{"fformat", &CmdProxy::fformat},
@ -1757,6 +1758,10 @@ class CmdProxy {
"Resets Region of interest in receiver. Default is all "
"channels/pixels enabled.");
INTEGER_COMMAND_VEC_ID(
rx_bunchsize, getRxBunchSize, setRxBunchSize, StringTo<int>,
"[n_frames]\n\tSet the number of frames the receiver listens to before pushing into fifo (buffer between listener and writer threads). Higher number results in fewer locks between fifo access. Default is 1. Expect signed 32 bit integer. ");
/* File */
INTEGER_COMMAND_VEC_ID(

View File

@ -1225,6 +1225,14 @@ void Detector::setRxROI(const defs::ROI value) { pimpl->setRxROI(value); }
void Detector::clearRxROI() { pimpl->clearRxROI(); }
Result<int> Detector::getRxBunchSize(Positions pos) const {
return pimpl->Parallel(&Module::getRxBunchSize, pos);
}
void Detector::setRxBunchSize(int value, Positions pos) {
pimpl->Parallel(&Module::setRxBunchSize, pos, value);
}
// File
Result<defs::fileFormat> Detector::getFileFormat(Positions pos) const {

View File

@ -1388,6 +1388,15 @@ void Module::setRxROIMetadata(const slsDetectorDefs::ROI arg) {
sendToReceiver(F_RECEIVER_SET_RECEIVER_ROI_METADATA, arg, nullptr);
}
int Module::getRxBunchSize() const {
return sendToReceiver<int>(F_GET_RECEIVER_BUNCH_SIZE);
}
void Module::setRxBunchSize(int value) {
sendToReceiver<int>(F_SET_RECEIVER_BUNCH_SIZE, value);
}
// File
slsDetectorDefs::fileFormat Module::getFileFormat() const {
return sendToReceiver<fileFormat>(F_GET_RECEIVER_FILE_FORMAT);

View File

@ -294,6 +294,8 @@ class Module : public virtual slsDetectorDefs {
defs::ROI getRxROI() const;
void setRxROI(const slsDetectorDefs::ROI arg);
void setRxROIMetadata(const slsDetectorDefs::ROI arg);
int getRxBunchSize() const;
void setRxBunchSize(int value);
/**************************************************
* *

View File

@ -254,6 +254,7 @@ TEST_CASE("rx_fifodepth", "[.cmd][.rx]") {
proxy.Call("rx_fifodepth", {}, -1, GET, oss);
REQUIRE(oss.str() == "rx_fifodepth 100\n");
}
REQUIRE_THROWS(proxy.Call("rx_fifodepth", {"0"}, -1, PUT));
for (int i = 0; i != det.size(); ++i) {
det.setRxFifoDepth(prev_val[i], {i});
}
@ -513,6 +514,32 @@ TEST_CASE("rx_clearroi", "[.cmd]") {
}
}
TEST_CASE("rx_bunchsize", "[.cmd][.rx]") {
Detector det;
CmdProxy proxy(&det);
auto prev_val = det.getRxBunchSize();
{
std::ostringstream oss;
proxy.Call("rx_bunchsize", {"10"}, -1, PUT, oss);
REQUIRE(oss.str() == "rx_bunchsize 10\n");
}
{
std::ostringstream oss;
proxy.Call("rx_bunchsize", {"100"}, -1, PUT, oss);
REQUIRE(oss.str() == "rx_bunchsize 100\n");
}
{
std::ostringstream oss;
proxy.Call("rx_bunchsize", {}, -1, GET, oss);
REQUIRE(oss.str() == "rx_bunchsize 100\n");
}
REQUIRE_THROWS(proxy.Call("rx_bunchsize", {"0"}, -1, PUT));
for (int i = 0; i != det.size(); ++i) {
det.setRxBunchSize(prev_val[i], {i});
}
}
/* File */
TEST_CASE("fformat", "[.cmd]") {

View File

@ -223,6 +223,8 @@ int ClientInterface::functionTable(){
flist[F_RECEIVER_GET_RECEIVER_ROI] = &ClientInterface::get_receiver_roi;
flist[F_RECEIVER_SET_RECEIVER_ROI] = &ClientInterface::set_receiver_roi;
flist[F_RECEIVER_SET_RECEIVER_ROI_METADATA] = &ClientInterface::set_receiver_roi_metadata;
flist[F_GET_RECEIVER_BUNCH_SIZE] = &ClientInterface::get_bunch_size;
flist[F_SET_RECEIVER_BUNCH_SIZE] = &ClientInterface::set_bunch_size;
for (int i = NUM_DET_FUNCTIONS + 1; i < NUM_REC_FUNCTIONS ; i++) {
LOG(logDEBUG1) << "function fnum: " << i << " (" <<
@ -1176,7 +1178,7 @@ int ClientInterface::get_additional_json_header(Interface &socket) {
int ClientInterface::set_udp_socket_buffer_size(Interface &socket) {
auto size = socket.Receive<int>();
if (size == 0) {
throw RuntimeError("Receiver socket buffer size must be > 0.");
throw RuntimeError("Receiver socket buffer size must be greater than 0.");
}
if (size > 0) {
verifyIdle(socket);
@ -1780,4 +1782,29 @@ int ClientInterface::set_receiver_roi_metadata(Interface &socket) {
return socket.Send(OK);
}
int ClientInterface::get_bunch_size(Interface &socket) {
int retval = static_cast<int>(impl()->getBunchSize());
LOG(logDEBUG1) << "bunch size retval:" << retval;
return socket.sendResult(retval);
}
int ClientInterface::set_bunch_size(Interface &socket) {
auto value = socket.Receive<int>();
if (value <= 0) {
throw RuntimeError("Could not set rx bunch size. Must be greater than 0.");
}
verifyIdle(socket);
LOG(logDEBUG1) << "Setting bunch size:" << value;
try {
impl()->setBunchSize(static_cast<size_t>(value));
} catch (const RuntimeError &e) {
throw RuntimeError("Could not set rx bunch size due to fifo structure memory allocation.");
}
int retval = impl()->getBunchSize();
validate(value, retval, std::string("set bunch size"), DEC);
LOG(logDEBUG1) << "bunch size retval:" << retval;
return socket.sendResult(retval);
}
} // namespace sls

View File

@ -170,6 +170,9 @@ class ClientInterface : private virtual slsDetectorDefs {
int get_receiver_roi(ServerInterface &socket);
int set_receiver_roi(ServerInterface &socket);
int set_receiver_roi_metadata(ServerInterface &socket);
int get_bunch_size(ServerInterface &socket);
int set_bunch_size(ServerInterface &socket);
Implementation *impl() {
if (receiver != nullptr) {

View File

@ -44,6 +44,7 @@ DataProcessor::DataProcessor(int index, detectorType detectorType, Fifo *fifo,
ctbAnalogDataBytes_(ctbAnalogDataBytes) {
LOG(logDEBUG) << "DataProcessor " << index << " created";
vetoThread = (detectorType_ == GOTTHARD2 && index != 0);
}
DataProcessor::~DataProcessor() { DeleteFiles(); }
@ -59,6 +60,10 @@ void DataProcessor::SetReceiverROI(ROI roi) {
receiverRoiEnabled_ = receiverRoi_.completeRoi() ? false : true;
}
void DataProcessor::SetBunchSize(size_t value) {
fifoBunchSize = value;
}
void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag_ = false;
@ -68,6 +73,12 @@ void DataProcessor::ResetParametersforNewAcquisition() {
firstStreamerFrame_ = true;
streamCurrentFrame_ = false;
completeImageToStreamBeforeCropping = make_unique<char[]>(generalData_->imageSize);
fifoBunchSizeBytes = generalData_->imageSize;
if (vetoThread) {
fifoBunchSizeBytes = generalData_->vetoDataSize;
}
fifoBunchSizeBytes += generalData_->fifoBufferHeaderSize;
}
void DataProcessor::RecordFirstIndex(uint64_t fnum) {
@ -249,31 +260,36 @@ std::string DataProcessor::CreateMasterFile(
void DataProcessor::ThreadExecution() {
char *buffer = nullptr;
fifo_->PopAddress(buffer);
LOG(logDEBUG5) << "DataProcessor " << index << ", " << std::hex
<< static_cast<void *>(buffer) << std::dec << ":" << buffer;
LOG(logDEBUG1) << "DataProcessor " << index << ", " << std::hex
<< static_cast<void *>(buffer) << std::dec;
// check dummy
auto numBytes = *reinterpret_cast<uint32_t *>(buffer);
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
bool streamImageInBunch = false;
char* tempBuffer = buffer;
for (size_t iFrame = 0; iFrame != fifoBunchSize; iFrame ++) {
try {
ProcessAnImage(buffer);
} catch (const std::exception &e) {
fifo_->FreeAddress(buffer);
return;
}
// stream (if time/freq to stream) or free
if (streamCurrentFrame_) {
// copy the complete image back if roi enabled
if (receiverRoiEnabled_) {
(*((uint32_t *)buffer)) = generalData_->imageSize;
memcpy(buffer + generalData_->fifoBufferHeaderSize, &completeImageToStreamBeforeCropping[0], generalData_->imageSize);
// end of acquisition (check dummy)
auto numBytes = *reinterpret_cast<uint32_t *>(tempBuffer);
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes << " " << std::hex << static_cast<void *>(tempBuffer) << std::dec;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
try {
ProcessAnImage(tempBuffer);
if (streamCurrentFrame_) {
streamImageInBunch = true;
}
}
// exception from callback
catch (const std::exception &e) {
;
}
tempBuffer += fifoBunchSizeBytes;
}
// stream or free
if (streamImageInBunch) {
fifo_->PushAddressToStream(buffer);
} else {
fifo_->FreeAddress(buffer);
@ -283,7 +299,7 @@ void DataProcessor::ThreadExecution() {
void DataProcessor::StopProcessing(char *buf) {
LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
// stream or free
// stream dummy or free
if (*dataStreamEnable_)
fifo_->PushAddressToStream(buf);
else
@ -339,8 +355,13 @@ void DataProcessor::ProcessAnImage(char *buf) {
(uint32_t)(fnum - firstIndex_);
}
streamCurrentFrame_ = true;
// needed to know which one to stream from the bunch
(*((uint32_t *)(buf + FIFO_HEADER_STREAM_ENABLE))) = 1;
} else {
streamCurrentFrame_ = false;
// needed to know which one not to stream from the bunch
(*((uint32_t *)(buf + FIFO_HEADER_STREAM_ENABLE))) = 0;
}
@ -390,6 +411,15 @@ void DataProcessor::ProcessAnImage(char *buf) {
// via stopReceiver tcp)
}
}
// copy the complete image back if roi enabled
if (streamCurrentFrame_) {
if (receiverRoiEnabled_) {
(*((uint32_t *)buf)) = generalData_->imageSize;
memcpy(buf + generalData_->fifoBufferHeaderSize, &completeImageToStreamBeforeCropping[0], generalData_->imageSize);
}
}
}
bool DataProcessor::SendToStreamer() {
@ -425,11 +455,11 @@ bool DataProcessor::CheckTimer() {
}
bool DataProcessor::CheckCount() {
if (currentFreqCount_ == *streamingFrequency_) {
if (currentFreqCount_ >= *streamingFrequency_) {
currentFreqCount_ = 1;
return true;
}
currentFreqCount_++;
++currentFreqCount_;
return false;
}
@ -455,7 +485,7 @@ void DataProcessor::PadMissingPackets(char *buf) {
sls_bitset pmask = header->packetsMask;
uint32_t dsize = generalData_->dataSize;
if (detectorType_ == GOTTHARD2 && index != 0) {
if (vetoThread) {
dsize = generalData_->vetoDataSize;
}
uint32_t fifohsize = generalData_->fifoBufferHeaderSize;

View File

@ -44,6 +44,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
void SetReceiverROI(ROI roi);
void ResetParametersforNewAcquisition();
void SetGeneralData(GeneralData *generalData);
void SetBunchSize(size_t value);
void CloseFiles();
void DeleteFiles();
@ -156,6 +157,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
bool activated_{false};
ROI receiverRoi_{};
bool receiverRoiEnabled_{false};
bool vetoThread{false};
std::unique_ptr<char[]> completeImageToStreamBeforeCropping;
/** if 0, sending random images with a timer */
uint32_t *streamingFrequency_;
@ -183,6 +186,10 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
File *dataFile_{nullptr};
size_t fifoBunchSize{0};
/** size in memory including headers */
size_t fifoBunchSizeBytes{0};
// call back
/**
* Call back for raw data

View File

@ -18,14 +18,15 @@ namespace sls {
const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int ind, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi,
DataStreamer::DataStreamer(int ind, detectorType dType, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi,
bool fr, slsDetectorDefs::xy np, bool *qe,
uint64_t *tot)
: ThreadObject(ind, TypeName), fifo(f), dynamicRange(dr), detectorRoi(r),
: ThreadObject(ind, TypeName), fifo(f), detType(dType), dynamicRange(dr), detectorRoi(r),
fileIndex(fi), flipRows(fr), numPorts(np), quadEnable(qe),
totalNumFrames(tot) {
LOG(logDEBUG) << "DataStreamer " << ind << " created";
vetoThread = (detType == GOTTHARD2 && index != 0);
}
DataStreamer::~DataStreamer() {
@ -50,6 +51,11 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) {
completeBuffer = new char[generalData->imageSizeComplete];
memset(completeBuffer, 0, generalData->imageSizeComplete);
}
fifoBunchSizeBytes = generalData->imageSize;
if (vetoThread) {
fifoBunchSizeBytes = generalData->vetoDataSize;
}
fifoBunchSizeBytes += generalData->fifoBufferHeaderSize;
}
void DataStreamer::RecordFirstIndex(uint64_t fnum, char *buf) {
@ -75,6 +81,10 @@ void DataStreamer::SetAdditionalJsonHeader(
isAdditionalJsonUpdated = true;
}
void DataStreamer::SetBunchSize(size_t value) {
fifoBunchSize = value;
}
void DataStreamer::CreateZmqSockets(int *nunits, uint32_t port,
const IpAddr ip, int hwm) {
uint32_t portnum = port + index;
@ -115,15 +125,23 @@ void DataStreamer::ThreadExecution() {
"pop 0x"
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
// check dummy
auto numBytes = *reinterpret_cast<uint32_t *>(buffer);
LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
char* tempBuffer = buffer;
for (size_t iFrame = 0; iFrame != fifoBunchSize; iFrame ++) {
ProcessAnImage(buffer);
// end of acquisition (check dummy)
auto numBytes = *reinterpret_cast<uint32_t *>(tempBuffer);
LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
// process only if it needs to be streamed
if (*((uint32_t *)(tempBuffer + FIFO_HEADER_STREAM_ENABLE))) {
ProcessAnImage(tempBuffer);
}
tempBuffer += fifoBunchSizeBytes;
}
// free
fifo->FreeAddress(buffer);

View File

@ -30,6 +30,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
* Calls Base Class CreateThread(), sets ErrorMask if error and increments
* NumberofDataStreamers
* @param ind self index
* @param dType detector type
* @param f address of Fifo pointer
* @param dr pointer to dynamic range
* @param r detectorRoi
@ -39,7 +40,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
* @param qe pointer to quad Enable
* @param tot pointer to total number of frames
*/
DataStreamer(int ind, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi, bool fr,
DataStreamer(int ind, detectorType dType, Fifo *f, uint32_t *dr, ROI *r, uint64_t *fi, bool fr,
xy np, bool *qe, uint64_t *tot);
/**
@ -55,6 +56,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
void SetFlipRows(bool fd);
void
SetAdditionalJsonHeader(const std::map<std::string, std::string> &json);
void SetBunchSize(size_t value);
/**
* Creates Zmq Sockets
@ -105,6 +107,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
static const std::string TypeName;
const GeneralData *generalData{nullptr};
Fifo *fifo;
detectorType detType;
ZmqSocket *zmqSocket{nullptr};
uint32_t *dynamicRange;
ROI *detectorRoi;
@ -112,6 +115,7 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
uint64_t *fileIndex;
bool flipRows;
std::map<std::string, std::string> additionalJsonHeader;
bool vetoThread{false};
/** Used by streamer thread to update local copy (reduce number of locks
* during streaming) */
@ -132,6 +136,10 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
xy numPorts{1, 1};
bool *quadEnable;
uint64_t *totalNumFrames;
size_t fifoBunchSize{0};
/** size in memory including headers */
size_t fifoBunchSizeBytes{0};
};
} // namespace sls

View File

@ -65,6 +65,7 @@ class GeneralData {
uint32_t adcEnableMaskTenGiga{BIT32_MASK};
slsDetectorDefs::ROI roi{};
uint32_t counterMask{0};
uint32_t defaultBunchSize{1};
GeneralData(){};
virtual ~GeneralData(){};
@ -445,10 +446,11 @@ class Gotthard2Data : public GeneralData {
maxFramesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
defaultFifoDepth = 50000;
defaultFifoDepth = 50;
standardheader = true;
vetoDataSize = 160;
vetoHsize = 16;
defaultBunchSize = 10000;
UpdateImageSize();
};

View File

@ -67,16 +67,17 @@ void Implementation::SetThreadPriorities() {
void Implementation::SetupFifoStructure() {
fifo.clear();
for (int i = 0; i < numUDPInterfaces; ++i) {
uint32_t datasize = generalData->imageSize;
size_t datasize = generalData->imageSize;
// veto data size
if (detType == GOTTHARD2 && i != 0) {
datasize = generalData->vetoImageSize;
}
datasize += generalData->fifoBufferHeaderSize;
datasize *= bunchSize;
// create fifo structure
try {
fifo.push_back(sls::make_unique<Fifo>(
i, datasize + (generalData->fifoBufferHeaderSize), fifoDepth));
fifo.push_back(sls::make_unique<Fifo>(i, datasize, fifoDepth));
} catch (...) {
fifo.clear();
fifoDepth = 0;
@ -93,9 +94,7 @@ void Implementation::SetupFifoStructure() {
dataStreamer[i]->SetFifo(fifo[i].get());
LOG(logINFO) << "Memory Allocated for Fifo " << i << ": "
<< (double)(((size_t)(datasize) +
(size_t)(generalData->fifoBufferHeaderSize)) *
(size_t)fifoDepth) /
<< (double)(datasize * (size_t)fifoDepth) /
(double)(1024 * 1024)
<< " MB";
}
@ -169,6 +168,7 @@ void Implementation::setDetectorType(const detectorType d) {
adcEnableMaskTenGiga = generalData->adcEnableMaskTenGiga;
detectorRoi = generalData->roi;
counterMask = generalData->counterMask;
bunchSize = generalData->defaultBunchSize;
SetLocalNetworkParameters();
SetupFifoStructure();
@ -204,10 +204,12 @@ void Implementation::setDetectorType(const detectorType d) {
for (const auto &it : listener) {
it->SetGeneralData(generalData);
it->SetActivate(activated);
it->SetBunchSize(bunchSize);
}
for (const auto &it : dataProcessor) {
it->SetGeneralData(generalData);
it->SetActivate(activated);
it->SetBunchSize(bunchSize);
}
SetThreadPriorities();
@ -453,6 +455,22 @@ void Implementation::setReceiverROIMetadata(const ROI arg) {
LOG(logINFO) << "receiver roi Metadata: " << ToString(receiverRoiMetadata);
}
size_t Implementation::getBunchSize() const { return bunchSize; }
void Implementation::setBunchSize(const size_t i) {
if (bunchSize != i) {
bunchSize = i;
for (const auto &it : listener)
it->SetBunchSize(i);
for (const auto &it : dataProcessor)
it->SetBunchSize(i);
for (const auto &it : dataStreamer)
it->SetBunchSize(i);
SetupFifoStructure();
}
LOG(logINFO) << "Fifo Bunch Size: " << i;
}
/**************************************************
* *
* File Parameters *
@ -675,12 +693,15 @@ void Implementation::stopReceiver() {
while (running) {
running = false;
for (const auto &it : listener)
if (it->IsRunning())
if (it->IsRunning()) {
running = true;
//LOG(logINFOBLUE) << "listener NOT done";
}
for (const auto &it : dataProcessor)
if (it->IsRunning())
if (it->IsRunning()) {
running = true;
//LOG(logINFOBLUE) << "processor NOT done";
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
@ -1002,6 +1023,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
&silentMode));
listener[i]->SetGeneralData(generalData);
listener[i]->SetActivate(activated);
listener[i]->SetBunchSize(bunchSize);
int ctbAnalogDataBytes = 0;
if (detType == CHIPTESTBOARD) {
@ -1016,6 +1038,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
dataProcessor[i]->SetGeneralData(generalData);
dataProcessor[i]->SetActivate(activated);
dataProcessor[i]->SetReceiverROI(portRois[i]);
dataProcessor[i]->SetBunchSize(bunchSize);
} catch (...) {
listener.clear();
dataProcessor.clear();
@ -1031,7 +1054,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
flip = (i == 1 ? true : false);
}
dataStreamer.push_back(sls::make_unique<DataStreamer>(
i, fifo[i].get(), &dynamicRange, &detectorRoi,
i, detType, fifo[i].get(), &dynamicRange, &detectorRoi,
&fileIndex, flip, numPorts, &quadEnable,
&numberOfTotalFrames));
dataStreamer[i]->SetGeneralData(generalData);
@ -1040,6 +1063,7 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(
additionalJsonHeader);
dataStreamer[i]->SetBunchSize(bunchSize);
} catch (...) {
if (dataStreamEnable) {
dataStreamer.clear();
@ -1158,7 +1182,7 @@ void Implementation::setDataStreamEnable(const bool enable) {
flip = (i == 1 ? true : false);
}
dataStreamer.push_back(sls::make_unique<DataStreamer>(
i, fifo[i].get(), &dynamicRange, &detectorRoi,
i, detType, fifo[i].get(), &dynamicRange, &detectorRoi,
&fileIndex, flip, numPorts, &quadEnable,
&numberOfTotalFrames));
dataStreamer[i]->SetGeneralData(generalData);
@ -1167,6 +1191,7 @@ void Implementation::setDataStreamEnable(const bool enable) {
streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(
additionalJsonHeader);
dataStreamer[i]->SetBunchSize(bunchSize);
} catch (...) {
dataStreamer.clear();
dataStreamEnable = false;

View File

@ -59,6 +59,8 @@ class Implementation : private virtual slsDetectorDefs {
ROI getReceiverROI() const;
void setReceiverROI(const ROI arg);
void setReceiverROIMetadata(const ROI arg);
size_t getBunchSize() const;
void setBunchSize(const size_t i);
/**************************************************
* *
@ -311,6 +313,7 @@ class Implementation : private virtual slsDetectorDefs {
std::array<ROI, 2> portRois{};
// receiver roi for complete detector for metadata
ROI receiverRoiMetadata{};
size_t bunchSize{0};
// file parameters
fileFormat fileFormatType{BINARY};

View File

@ -32,6 +32,7 @@ Listener::Listener(int ind, detectorType dtype, Fifo *f,
actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp),
detectorDataStream(detds), silentMode(sm) {
LOG(logDEBUG) << "Listener " << ind << " created";
vetoThread = (myDetectorType == GOTTHARD2 && index != 0);
}
Listener::~Listener() = default;
@ -85,7 +86,7 @@ void Listener::ResetParametersforNewAcquisition() {
lastCaughtFrameIndex = 0;
carryOverFlag = false;
uint32_t packetSize = generalData->packetSize;
if (myDetectorType == GOTTHARD2 && index != 0) {
if (vetoThread) {
packetSize = generalData->vetoPacketSize;
}
carryOverPacket = make_unique<char[]>(packetSize);
@ -98,6 +99,12 @@ void Listener::ResetParametersforNewAcquisition() {
// reset fifo statistic
fifo->GetMaxLevelForFifoBound();
fifo->GetMinLevelForFifoFree();
fifoBunchSizeBytes = generalData->imageSize;
if (vetoThread) {
fifoBunchSizeBytes = generalData->vetoDataSize;
}
fifoBunchSizeBytes += generalData->fifoBufferHeaderSize;
}
void Listener::RecordFirstIndex(uint64_t fnum) {
@ -119,6 +126,10 @@ void Listener::SetGeneralData(GeneralData *g) { generalData = g; }
void Listener::SetActivate(bool enable) { activated = enable; }
void Listener::SetBunchSize(size_t value) {
fifoBunchSize = value;
}
void Listener::CreateUDPSockets() {
if (!activated || !(*detectorDataStream)) {
return;
@ -135,7 +146,7 @@ void Listener::CreateUDPSockets() {
ShutDownUDPSocket();
uint32_t packetSize = generalData->packetSize;
if (myDetectorType == GOTTHARD2 && index != 0) {
if (vetoThread) {
packetSize = generalData->vetoPacketSize;
}
@ -184,7 +195,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int s) {
}
uint32_t packetSize = generalData->packetSize;
if (myDetectorType == GOTTHARD2 && index != 0) {
if (vetoThread) {
packetSize = generalData->vetoPacketSize;
}
@ -220,55 +231,42 @@ void Listener::SetHardCodedPosition(uint16_t r, uint16_t c) {
void Listener::ThreadExecution() {
char *buffer;
int rc = 0;
fifo->GetNewAddress(buffer);
LOG(logDEBUG5) << "Listener " << index
LOG(logDEBUG1) << "Listener " << index
<< ", "
"pop 0x"
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
// udpsocket doesnt exist
if (activated && *detectorDataStream && !udpSocketAlive && !carryOverFlag) {
// LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not
// created or shut down earlier";
(*((uint32_t *)buffer)) = 0;
StopListening(buffer);
return;
}
// get data
if ((*status != TRANSMITTING &&
(!activated || !(*detectorDataStream) || udpSocketAlive)) ||
carryOverFlag) {
rc = ListenToAnImage(buffer);
}
// error check, (should not be here) if not transmitting yet (previous if)
// rc should be > 0
if (rc == 0) {
if (!udpSocketAlive) {
(*((uint32_t *)buffer)) = 0;
char* tempBuffer = buffer;
for (size_t iFrame = 0; iFrame != fifoBunchSize; iFrame ++) {
// end of acquisition or not activated
if ((*status == TRANSMITTING || !udpSocketAlive) && !carryOverFlag) {
(*((uint32_t *)tempBuffer)) = DUMMY_PACKET_VALUE;
StopListening(buffer);
} else
fifo->FreeAddress(buffer);
return;
}
return;
}
int rc = ListenToAnImage(tempBuffer);
// discarding image
else if (rc < 0) {
fifo->FreeAddress(buffer);
return;
// socket closed or discarding image (free retake)
// weird frame numbers (print and rc = 0), then retake
if (rc <= 0) {
if (udpSocketAlive) {
--iFrame;
}
} else {
(*((uint32_t *)tempBuffer)) = rc;
tempBuffer += fifoBunchSizeBytes;
numFramesStatistic++;
}
}
(*((uint32_t *)buffer)) = rc;
// push into fifo
fifo->PushAddress(buffer);
// Statistics
if (!(*silentMode)) {
numFramesStatistic++;
if (numFramesStatistic >=
// second condition also for infinite #number of frames
(((*framesPerFile) == 0) ? STATISTIC_FRAMENUMBER_INFINITE
@ -278,12 +276,10 @@ void Listener::ThreadExecution() {
}
void Listener::StopListening(char *buf) {
(*((uint32_t *)buf)) = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf);
StopRunning();
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber
LOG(logDEBUG1) << index << ": Listening Completed (" << *udpPortNumber
<< ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed";
}
/* buf includes the fifo header and packet header */
@ -300,7 +296,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
uint32_t hsize = generalData->headerSizeinPacket;
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
bool standardheader = generalData->standardheader;
if (myDetectorType == GOTTHARD2 && index != 0) {
if (vetoThread) {
dsize = generalData->vetoDataSize;
imageSize = generalData->vetoImageSize;
packetSize = generalData->vetoPacketSize;
@ -317,15 +313,6 @@ uint32_t Listener::ListenToAnImage(char *buf) {
memset(buf, 0, fifohsize);
new_header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
// deactivated port (eiger)
if (!(*detectorDataStream)) {
return 0;
}
// deactivated (eiger)
if (!activated) {
return 0;
}
// look for carry over
if (carryOverFlag) {
LOG(logDEBUG3) << index << "carry flag";
@ -350,6 +337,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
<< "(Weird), With carry flag: Frame number " << fnum
<< " less than current frame number " << currentFrameIndex;
carryOverFlag = false;
exit(-1);//***************************
return 0;
}
switch (*frameDiscardMode) {
@ -514,7 +502,7 @@ uint32_t Listener::ListenToAnImage(char *buf) {
lastCaughtFrameIndex = fnum;
LOG(logDEBUG1) << "Listening " << index
LOG(logDEBUG) << "Listening " << index
<< ": currentfindex:" << currentFrameIndex
<< ", fnum:" << fnum << ", pnum:" << pnum
<< ", numpackets:" << numpackets;

View File

@ -65,6 +65,8 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
void ResetParametersforNewAcquisition();
void SetGeneralData(GeneralData *g);
void SetActivate(bool enable);
void SetBunchSize(size_t value);
void CreateUDPSockets();
void ShutDownUDPSocket();
@ -130,6 +132,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
bool activated{false};
bool *detectorDataStream;
bool *silentMode;
bool vetoThread{false};
/** row hardcoded as 1D or 2d,
* if detector does not send them yet or
@ -164,6 +167,10 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
std::unique_ptr<char[]> listeningPacket;
std::atomic<bool> udpSocketAlive{false};
size_t fifoBunchSize{0};
/** size in memory including headers */
size_t fifoBunchSizeBytes{0};
// for print progress during acquisition*/
uint32_t numPacketsStatistic{0};
uint32_t numFramesStatistic{0};

View File

@ -39,8 +39,7 @@ namespace sls {
// fifo
#define FIFO_HEADER_NUMBYTES (16)
#define FIFO_DATASIZE_NUMBYTES (4)
#define FIFO_PADDING_NUMBYTES \
(4) // for 8 byte alignment due to sls_receiver_header structure
#define FIFO_HEADER_STREAM_ENABLE (8)
// hdf5
#define MAX_CHUNKED_IMAGES (1)

View File

@ -379,6 +379,8 @@ enum detFuncs {
F_RECEIVER_GET_RECEIVER_ROI,
F_RECEIVER_SET_RECEIVER_ROI,
F_RECEIVER_SET_RECEIVER_ROI_METADATA,
F_GET_RECEIVER_BUNCH_SIZE,
F_SET_RECEIVER_BUNCH_SIZE,
NUM_REC_FUNCTIONS
};
@ -756,6 +758,8 @@ const char* getFunctionNameFromEnum(enum detFuncs func) {
case F_RECEIVER_GET_RECEIVER_ROI: return "F_RECEIVER_GET_RECEIVER_ROI";
case F_RECEIVER_SET_RECEIVER_ROI: return "F_RECEIVER_SET_RECEIVER_ROI";
case F_RECEIVER_SET_RECEIVER_ROI_METADATA: return "F_RECEIVER_SET_RECEIVER_ROI_METADATA";
case F_GET_RECEIVER_BUNCH_SIZE: return "F_GET_RECEIVER_BUNCH_SIZE";
case F_SET_RECEIVER_BUNCH_SIZE: return "F_SET_RECEIVER_BUNCH_SIZE";
case NUM_REC_FUNCTIONS: return "NUM_REC_FUNCTIONS";
default: return "Unknown Function";