From bb32b2f653a0a8ee1f9371b3d33db088af413b0b Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Fri, 24 Apr 2020 15:13:37 +0200 Subject: [PATCH] rxr done WIP --- slsDetectorSoftware/src/DetectorImpl.cpp | 4 +- slsDetectorSoftware/src/Receiver.cpp | 4 + slsReceiverSoftware/src/ClientInterface.cpp | 101 +-- slsReceiverSoftware/src/ClientInterface.h | 3 - slsReceiverSoftware/src/DataStreamer.cpp | 16 +- slsReceiverSoftware/src/DataStreamer.h | 14 +- slsReceiverSoftware/src/GeneralData.h | 8 - slsReceiverSoftware/src/Implementation.cpp | 702 +++++++------------- slsReceiverSoftware/src/Implementation.h | 29 +- slsReceiverSoftware/src/Listener.cpp | 2 +- slsReceiverSoftware/src/Listener.h | 4 +- slsSupportLib/include/ZmqSocket.h | 8 +- slsSupportLib/include/sls_detector_funcs.h | 4 - slsSupportLib/src/ZmqSocket.cpp | 8 +- 14 files changed, 314 insertions(+), 593 deletions(-) diff --git a/slsDetectorSoftware/src/DetectorImpl.cpp b/slsDetectorSoftware/src/DetectorImpl.cpp index 0bcb9566e..f8b8ea994 100755 --- a/slsDetectorSoftware/src/DetectorImpl.cpp +++ b/slsDetectorSoftware/src/DetectorImpl.cpp @@ -736,8 +736,8 @@ void DetectorImpl::readFrameFromReceiver() { nPixelsX = zHeader.npixelsx; nPixelsY = zHeader.npixelsy; // detector shape - nX = zHeader.ndetx; - nY = zHeader.ndety; + nX = zHeader.nSocketX; + nY = zHeader.nSocketY; nY *= numInterfaces; nDetPixelsX = nX * nPixelsX; nDetPixelsY = nY * nPixelsY; diff --git a/slsDetectorSoftware/src/Receiver.cpp b/slsDetectorSoftware/src/Receiver.cpp index 743bae302..3ca6a14e5 100755 --- a/slsDetectorSoftware/src/Receiver.cpp +++ b/slsDetectorSoftware/src/Receiver.cpp @@ -140,6 +140,10 @@ sls::MacAddr Receiver::configure(slsDetectorDefs::rxParameters arg) { memcpy(&arg.zmq_ip, &ip, sizeof(ip)); } + if (arg.detType == EIGER) { + arg.udpInterfaces = 2; + } + LOG(logDEBUG1) << "detType:" << arg.detType << std::endl << "detectorSize.x:" << arg.detectorSize.x << std::endl diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index 405d8a295..7127f9162 100755 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -95,7 +95,7 @@ void ClientInterface::startTCPServer() { } if (receiver) { - receiver->shutDownUDPSockets(); + receiver->shutDownUDPSocket(); } LOG(logINFOBLUE) << "Exiting [ TCP server Tid: " << syscall(SYS_gettid) << "]"; } @@ -186,10 +186,8 @@ int ClientInterface::functionTable(){ flist[F_SET_RECEIVER_QUAD] = &ClientInterface::set_quad_type; flist[F_SET_RECEIVER_READ_N_LINES] = &ClientInterface::set_read_n_lines; flist[F_SET_RECEIVER_UDP_IP] = &ClientInterface::set_udp_ip; - flist[F_SET_RECEIVER_UDP_IP2] = &ClientInterface::set_udp_ip2; flist[F_SET_RECEIVER_UDP_PORT] = &ClientInterface::set_udp_port; - flist[F_SET_RECEIVER_UDP_PORT2] = &ClientInterface::set_udp_port2; - flist[F_SET_RECEIVER_NUM_INTERFACES] = &ClientInterface::set_num_interfaces; + flist[F_SET_RECEIVER_NUM_INTERFACES] = &ClientInterface::set_num_interfaces; flist[F_RECEIVER_SET_ADC_MASK_10G] = &ClientInterface::set_adc_mask_10g; flist[F_RECEIVER_SET_NUM_COUNTERS] = &ClientInterface::set_num_counters; flist[F_INCREMENT_FILE_INDEX] = &ClientInterface::increment_file_index; @@ -392,22 +390,24 @@ int ClientInterface::setup_receiver(Interface &socket) { impl()->setDetectorHostname(arg.hostname); // udp setup - sls::MacAddr retvals[2]; // primary interface.. only udpip, else udpip2 - if (arg.udp_dstmac == 0 && arg.udp_dstip != 0) { - retvals[0] = setUdpIp(sls::IpAddr(arg.udp_dstip)); - } - if (arg.udp_dstmac2 == 0 && arg.udp_dstip2 != 0) { - retvals[1] = setUdpIp2(sls::IpAddr(arg.udp_dstip2)); - } - impl()->setUDPPortNumber(arg.udp_dstport); - impl()->setUDPPortNumber2(arg.udp_dstport2); - if (myDetectorType == JUNGFRAU) { - try { - impl()->setNumberofUDPInterfaces(arg.udpInterfaces); - } catch(const RuntimeError &e) { - throw RuntimeError("Failed to set number of interfaces to " + - std::to_string(arg.udpInterfaces)); + impl()->setInterfaceId(arg.interfaceId); + sls::MacAddr retval; + if (arg.interfaceId == 0) { + if (arg.udp_dstmac == 0 && arg.udp_dstip != 0) { + retval = setUdpIp(sls::IpAddr(arg.udp_dstip)); } + impl()->setUDPPortNumber(arg.udp_dstport); + } else { + if (arg.udp_dstmac2 == 0 && arg.udp_dstip2 != 0) { + retval = setUdpIp(sls::IpAddr(arg.udp_dstip2)); + } + impl()->setUDPPortNumber(arg.udp_dstport2); + } + try { + impl()->setNumberofUDPInterfaces(arg.udpInterfaces); + } catch(const RuntimeError &e) { + throw RuntimeError("Failed to set number of interfaces to " + + std::to_string(arg.udpInterfaces)); } impl()->setUDPSocketBufferSize(0); @@ -509,7 +509,7 @@ int ClientInterface::setup_receiver(Interface &socket) { } impl()->setStreamingSourceIP(ip); } - return socket.sendResult(retvals); + return socket.sendResult(retval); } void ClientInterface::setDetectorType(detectorType arg) { @@ -532,6 +532,7 @@ void ClientInterface::setDetectorType(detectorType arg) { receiver = sls::make_unique(arg); myDetectorType = arg; } catch (...) { + receiver.reset(); throw RuntimeError("Could not set detector type"); } @@ -904,15 +905,9 @@ int ClientInterface::get_frame_index(Interface &socket) { } int ClientInterface::get_missing_packets(Interface &socket) { - std::vector m = impl()->getNumMissingPackets(); - LOG(logDEBUG1) << "missing packets:" << sls::ToString(m); - int retvalsize = m.size(); - uint64_t retval[retvalsize]; - std::copy(std::begin(m), std::end(m), retval); - socket.Send(OK); - socket.Send(&retvalsize, sizeof(retvalsize)); - socket.Send(retval, sizeof(retval)); - return OK; + uint64_t retval = impl()->getNumMissingPackets(); + LOG(logDEBUG1) << "missing packets:" << retval; + return socket.sendResult(retval); } int ClientInterface::get_frames_caught(Interface &socket) { @@ -1564,9 +1559,7 @@ sls::MacAddr ClientInterface::setUdpIp(sls::IpAddr arg) { LOG(logERROR) << "Failed to get udp ethernet interface from IP " << arg << ". Got " << eth; } impl()->setEthernetInterface(eth); - if (myDetectorType == EIGER) { - impl()->setEthernetInterface2(eth); - } + // get mac address auto retval = sls::InterfaceNameToMac(eth); if (retval == 0) { @@ -1584,38 +1577,6 @@ int ClientInterface::set_udp_ip(Interface &socket) { return socket.sendResult(retval); } -sls::MacAddr ClientInterface::setUdpIp2(sls::IpAddr arg) { - // getting eth - std::string eth = sls::IpToInterfaceName(arg.str()); - if (eth == "none") { - throw RuntimeError("Failed to get udp ethernet interface2 from IP " + arg.str()); - } - if (eth.find('.') != std::string::npos) { - eth = ""; - LOG(logERROR) << "Failed to get udp ethernet interface2 from IP " << arg << ". Got " << eth; - } - impl()->setEthernetInterface2(eth); - - // get mac address - auto retval = sls::InterfaceNameToMac(eth); - if (retval == 0) { - throw RuntimeError("Failed to get udp mac adddress2 to listen to (eth:" + eth + ", ip:" + arg.str() + ")\n"); - } - return retval; -} - -int ClientInterface::set_udp_ip2(Interface &socket) { - auto arg = socket.Receive(); - verifyIdle(socket); - if (myDetectorType != JUNGFRAU) { - throw RuntimeError("UDP Destination IP2 not implemented for this detector"); - } - LOG(logINFO) << "Received UDP IP2: " << arg; - auto retval = setUdpIp2(arg); - LOG(logINFO) << "Receiver MAC Address2: " << retval; - return socket.sendResult(retval); -} - int ClientInterface::set_udp_port(Interface &socket) { auto arg = socket.Receive(); verifyIdle(socket); @@ -1625,18 +1586,6 @@ int ClientInterface::set_udp_port(Interface &socket) { return OK; } -int ClientInterface::set_udp_port2(Interface &socket) { - auto arg = socket.Receive(); - verifyIdle(socket); - if (myDetectorType != JUNGFRAU && myDetectorType != EIGER) { - throw RuntimeError("UDP Destination Port2 not implemented for this detector"); - } - LOG(logDEBUG1) << "Setting UDP Port:" << arg; - impl()->setUDPPortNumber2(arg); - socket.Send(OK); - return OK; -} - int ClientInterface::set_num_interfaces(Interface &socket) { auto arg = socket.Receive(); arg = (arg > 1 ? 2 : 1); diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h index f9b029946..f63f05325 100755 --- a/slsReceiverSoftware/src/ClientInterface.h +++ b/slsReceiverSoftware/src/ClientInterface.h @@ -146,10 +146,7 @@ class ClientInterface : private virtual slsDetectorDefs { int set_read_n_lines(sls::ServerInterface &socket); sls::MacAddr setUdpIp(sls::IpAddr arg); int set_udp_ip(sls::ServerInterface &socket); - sls::MacAddr setUdpIp2(sls::IpAddr arg); - int set_udp_ip2(sls::ServerInterface &socket); int set_udp_port(sls::ServerInterface &socket); - int set_udp_port2(sls::ServerInterface &socket); int set_num_interfaces(sls::ServerInterface &socket); int set_adc_mask_10g(sls::ServerInterface &socket); int set_num_counters(sls::ServerInterface &socket); diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index ccec6db9f..ac3620df0 100755 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -17,7 +17,7 @@ const std::string DataStreamer::TypeName = "DataStreamer"; DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, - uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot) : + uint64_t* fi, int fd, int* nr, bool* qe, uint64_t* tot) : ThreadObject(ind, TypeName), fifo(f), dynamicRange(dr), @@ -27,8 +27,8 @@ DataStreamer::DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, quadEnable(qe), totalNumFrames(tot) { - numDet[0] = nd[0]; - numDet[1] = nd[1]; + numRx[0] = nr[0]; + numRx[1] = nr[1]; LOG(logDEBUG) << "DataStreamer " << ind << " created"; } @@ -74,9 +74,9 @@ void DataStreamer::SetGeneralData(GeneralData* g) { generalData->Print(); } -void DataStreamer::SetNumberofDetectors(int* nd) { - numDet[0] = nd[0]; - numDet[1] = nd[1]; +void DataStreamer::SetReceiverShape(int* nr) { + numRx[0] = nr[0]; + numRx[1] = nr[1]; } void DataStreamer::SetFlippedDataX(int fd) { @@ -211,8 +211,8 @@ int DataStreamer::SendHeader(sls_receiver_header* rheader, uint32_t size, uint32 zHeader.dynamicRange = *dynamicRange; zHeader.fileIndex = *fileIndex; - zHeader.ndetx = numDet[0]; - zHeader.ndety = numDet[1]; + zHeader.nSocketX = numRx[0]; + zHeader.nSocketY = numRx[1]; zHeader.npixelsx = nx; zHeader.npixelsy = ny; zHeader.imageSize = size; diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h index 8b7d81881..a02c26b5d 100755 --- a/slsReceiverSoftware/src/DataStreamer.h +++ b/slsReceiverSoftware/src/DataStreamer.h @@ -29,12 +29,12 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { * @param r roi * @param fi pointer to file index * @param fd flipped data enable for x dimension - * @param nd pointer to number of detectors in each dimension + * @param nd pointer to number of receivers in each dimension * @param qe pointer to quad Enable * @param tot pointer to total number of frames */ DataStreamer(int ind, Fifo* f, uint32_t* dr, ROI* r, - uint64_t* fi, int fd, int* nd, bool* qe, uint64_t* tot); + uint64_t* fi, int fd, int* nr, bool* qe, uint64_t* tot); /** * Destructor @@ -60,10 +60,10 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { void SetGeneralData(GeneralData* g); /** - * Set number of detectors - * @param number of detectors in both dimensions + * Set receiver shape + * @param number of receivers in both dimensions */ - void SetNumberofDetectors(int* nd); + void SetReceiverShape(int* nr); /** * Set Flipped data enable across x dimension @@ -178,8 +178,8 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { /** Complete buffer used for roi, eg. shortGotthard */ char* completeBuffer{nullptr}; - /** Number of Detectors in X and Y dimension */ - int numDet[2]; + /** Number of Recievers in X and Y dimension */ + int numRx[2]; /** Quad Enable */ bool* quadEnable; diff --git a/slsReceiverSoftware/src/GeneralData.h b/slsReceiverSoftware/src/GeneralData.h index 4aec7f4ff..1a5dcd536 100755 --- a/slsReceiverSoftware/src/GeneralData.h +++ b/slsReceiverSoftware/src/GeneralData.h @@ -64,9 +64,6 @@ public: /** Default Fifo depth */ uint32_t defaultFifoDepth; - /** Threads per receiver */ - uint32_t threadsPerReceiver; - /** Size of a header packet */ uint32_t headerPacketSize; @@ -105,7 +102,6 @@ public: maxFramesPerFile(0), fifoBufferHeaderSize(0), defaultFifoDepth(0), - threadsPerReceiver(1), headerPacketSize(0), nPixelsXComplete(0), nPixelsYComplete(0), @@ -233,7 +229,6 @@ public: LOG(level) << "Max Frames Per File: " << maxFramesPerFile; LOG(level) << "Fifo Buffer Header Size: " << fifoBufferHeaderSize; LOG(level) << "Default Fifo Depth: " << defaultFifoDepth; - LOG(level) << "Threads Per Receiver: " << threadsPerReceiver; LOG(level) << "Header Packet Size: " << headerPacketSize; LOG(level) << "Complete Pixels X: " << nPixelsXComplete; LOG(level) << "Complete Pixels Y: " << nPixelsYComplete; @@ -415,7 +410,6 @@ class EigerData : public GeneralData { maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE; fifoBufferHeaderSize= FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); defaultFifoDepth = 1000; - threadsPerReceiver = 2; headerPacketSize = 40; standardheader = true; }; @@ -479,7 +473,6 @@ class JungfrauData : public GeneralData { nPixelsY = 256; packetsPerFrame = 64; imageSize = dataSize * packetsPerFrame; - threadsPerReceiver = 2; defaultUdpSocketBufferSize = (500 * 1024 * 1024); } @@ -488,7 +481,6 @@ class JungfrauData : public GeneralData { nPixelsY = 512; packetsPerFrame = 128; imageSize = dataSize * packetsPerFrame; - threadsPerReceiver = 1; defaultUdpSocketBufferSize = (1000 * 1024 * 1024); } } diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index a8d9204bc..39c2cd56e 100755 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -38,23 +38,22 @@ void Implementation::DeleteMembers() { } additionalJsonHeader.clear(); - listener.clear(); - dataProcessor.clear(); - dataStreamer.clear(); - fifo.clear(); - eth.clear(); - udpPortNum.clear(); ctbDbitList.clear(); + listener.reset(); + dataProcessor.reset(); + dataStreamer.reset(); + fifo.reset(); } void Implementation::InitializeMembers() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; // config parameters - numThreads = 1; myDetectorType = GENERIC; - for (int i = 0; i < MAX_DIMENSIONS; ++i) + for (int i = 0; i < MAX_DIMENSIONS; ++i) { numDet[i] = 0; + numRx[i] = 0; + } detID = 0; detHostname = ""; silentMode = false; @@ -77,13 +76,10 @@ void Implementation::InitializeMembers() { stoppedFlag = false; // network configuration (UDP) + interfaceId = 0; numUDPInterfaces = 1; - eth.resize(MAX_NUMBER_OF_LISTENING_THREADS); - udpPortNum.resize(MAX_NUMBER_OF_LISTENING_THREADS); - for (int i = 0; i < MAX_NUMBER_OF_LISTENING_THREADS; ++i) { - eth[i] = ""; - udpPortNum[i] = DEFAULT_UDP_PORTNO + i; - } + eth = ""; + udpPortNum = DEFAULT_UDP_PORTNO; udpSocketBufferSize = 0; actualUDPSocketBufferSize = 0; @@ -168,44 +164,34 @@ void Implementation::SetLocalNetworkParameters() { void Implementation::SetThreadPriorities() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - for (const auto &it : listener) { - it->SetThreadPriority(LISTENER_PRIORITY); - } + listener->SetThreadPriority(LISTENER_PRIORITY); } void Implementation::SetupFifoStructure() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - fifo.clear(); - for (int i = 0; i < numThreads; ++i) { - - // create fifo structure - try { - fifo.push_back(sls::make_unique( - i, - (generalData->imageSize) + (generalData->fifoBufferHeaderSize), - fifoDepth)); - } catch (...) { - fifo.clear(); - fifoDepth = 0; - throw sls::RuntimeError("Could not allocate memory for fifo structure " + - std::to_string(i) + ". FifoDepth is now 0."); - } - // set the listener & dataprocessor threads to point to the right fifo - if (listener.size()) - listener[i]->SetFifo(fifo[i].get()); - if (dataProcessor.size()) - dataProcessor[i]->SetFifo(fifo[i].get()); - if (dataStreamer.size()) - dataStreamer[i]->SetFifo(fifo[i].get()); + // create fifo structure + try { + fifo = sls::make_unique( 0, + (generalData->imageSize) + (generalData->fifoBufferHeaderSize), + fifoDepth); + } catch (...) { + fifoDepth = 0; + throw sls::RuntimeError("Could not allocate memory for fifo structure " + ". FifoDepth is now 0."); } + // set the listener & dataprocessor threads to point to the right fifo + listener->SetFifo(fifo.get()); + dataProcessor->SetFifo(fifo.get()); + if (dataStreamEnable) + dataStreamer->SetFifo(fifo.get()); - LOG(logINFO) << "Memory Allocated Per Fifo: " + LOG(logINFO) << "Memory Allocated: " << (double)(((size_t)(generalData->imageSize) + (size_t)(generalData->fifoBufferHeaderSize)) * (size_t)fifoDepth) / (double)(1024 * 1024) << " MB"; - LOG(logINFO) << numThreads << " Fifo structure(s) reconstructed"; + LOG(logINFO) << " Fifo structure(s) reconstructed"; } @@ -260,7 +246,6 @@ void Implementation::setDetectorType(const detectorType d) { default: break; } - numThreads = generalData->threadsPerReceiver; fifoDepth = generalData->defaultFifoDepth; udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; framesPerFile = generalData->maxFramesPerFile; @@ -268,36 +253,29 @@ void Implementation::setDetectorType(const detectorType d) { SetLocalNetworkParameters(); SetupFifoStructure(); - // create threads - for (int i = 0; i < numThreads; ++i) { - - try { - auto fifo_ptr = fifo[i].get(); - listener.push_back(sls::make_unique( - i, myDetectorType, fifo_ptr, &status, &udpPortNum[i], ð[i], - &numberOfTotalFrames, &dynamicRange, &udpSocketBufferSize, - &actualUDPSocketBufferSize, &framesPerFile, &frameDiscardMode, - &activated, &deactivatedPaddingEnable, &silentMode)); - dataProcessor.push_back(sls::make_unique( - i, myDetectorType, fifo_ptr, &fileFormatType, fileWriteEnable, - &masterFileWriteEnable, &dataStreamEnable, - &dynamicRange, &streamingFrequency, &streamingTimerInMs, - &framePadding, &activated, &deactivatedPaddingEnable, - &silentMode, &quadEnable, &ctbDbitList, &ctbDbitOffset, - &ctbAnalogDataBytes)); - } catch (...) { - listener.clear(); - dataProcessor.clear(); - throw sls::RuntimeError("Could not create listener/dataprocessor threads (index:" + std::to_string(i) + ")"); - } + try { + auto fifo_ptr = fifo.get(); + listener = sls::make_unique( + 0, myDetectorType, fifo_ptr, &status, &udpPortNum, ð, + &numberOfTotalFrames, &dynamicRange, &udpSocketBufferSize, + &actualUDPSocketBufferSize, &framesPerFile, &frameDiscardMode, + &activated, &deactivatedPaddingEnable, &silentMode); + dataProcessor = sls::make_unique( + 0, myDetectorType, fifo_ptr, &fileFormatType, fileWriteEnable, + &masterFileWriteEnable, &dataStreamEnable, + &dynamicRange, &streamingFrequency, &streamingTimerInMs, + &framePadding, &activated, &deactivatedPaddingEnable, + &silentMode, &quadEnable, &ctbDbitList, &ctbDbitOffset, + &ctbAnalogDataBytes); + } catch (...) { + listener.reset(); + dataProcessor.reset(); + throw sls::RuntimeError("Could not create listener/dataprocessor threads"); } // set up writer and callbacks - - for (const auto &it : listener) - it->SetGeneralData(generalData); - for (const auto &it : dataProcessor) - it->SetGeneralData(generalData); + listener->SetGeneralData(generalData); + dataProcessor->SetGeneralData(generalData); SetThreadPriorities(); LOG(logDEBUG) << " Detector type set to " << sls::ToString(d); @@ -310,33 +288,39 @@ int *Implementation::getMultiDetectorSize() const { void Implementation::setDetectorSize(const int *size) { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - std::string log_message = "Detector Size (ports): ("; - for (int i = 0; i < MAX_DIMENSIONS; ++i) { - // x dir (colums) each udp port - if (myDetectorType == EIGER && i == X) - numDet[i] = size[i] * 2; - // y dir (rows) each udp port - else if (numUDPInterfaces == 2 && i == Y) - numDet[i] = size[i] * 2; - else - numDet[i] = size[i]; - log_message += std::to_string(numDet[i]); - if (i < MAX_DIMENSIONS - 1) - log_message += ", "; + numDet[X] = size[X]; + numDet[Y] = size[Y]; + numRx[X] = numDet[X]; + numRx[Y] = numDet[Y]; + + // calculating receivers shape + switch (myDetectorType) { + case EIGER: + if (quadEnable) { + numRx[X] = 1; + numRx[Y] = 2; + } else { + numRx[X] = numDet[X] * 2; + } + break; + case JUNGFRAU: + if (numUDPInterfaces == 2) { + numRx[Y] = numDet[X] * 2; + } else { + numRx[Y] = numDet[X]; + } + break; + default: + break; } - log_message += ")"; + if (dataStreamEnable) + dataStreamer->SetReceiverShape(numRx); + setDetectorPositionId(detID); - int nd[2] = {numDet[0], numDet[1]}; - if (quadEnable) { - nd[0] = 1; - nd[1] = 2; - } - for (const auto &it : dataStreamer) { - it->SetNumberofDetectors(nd); - } + LOG(logINFO) << "Receiver Shape: (" << numRx[X] << ", " + << numRx[Y] << ")"; +} - LOG(logINFO) << log_message; -} int Implementation::getDetectorPositionId() const { LOG(logDEBUG3) << __SHORT_AT__ << " called"; @@ -352,20 +336,19 @@ void Implementation::setDetectorPositionId(const int id) { streamingPort = DEFAULT_ZMQ_RX_PORTNO + (detID * (myDetectorType == EIGER ? 2 : 1)); - for (unsigned int i = 0; i < dataProcessor.size(); ++i) { - dataProcessor[i]->SetupFileWriter( - fileWriteEnable, (int *)numDet, &framesPerFile, &fileName, &filePath, - &fileIndex, &overwriteEnable, &detID, &numThreads, &numberOfTotalFrames, - &dynamicRange, &udpPortNum[i], generalData); - } - assert(numDet[1] != 0); - for (unsigned int i = 0; i < listener.size(); ++i) { - uint16_t row = 0, col = 0; - row = (detID % numDet[1]) * ((numUDPInterfaces == 2) ? 2 : 1); // row - col = (detID / numDet[1]) * ((myDetectorType == EIGER) ? 2 : 1) + - i; // col for horiz. udp ports - listener[i]->SetHardCodedPosition(row, col); - } + dataProcessor->SetupFileWriter( + fileWriteEnable, (int *)numRx, &framesPerFile, &fileName, &filePath, + &fileIndex, &overwriteEnable, &detID, &numUDPInterfaces, &numberOfTotalFrames, + &dynamicRange, &udpPortNum, generalData); + + assert(numRx[1] != 0); + uint16_t row = 0, col = 0; + + row = (detID % numRx[1]) * ((myDetectorType == JUNGFRAU && + numUDPInterfaces == 2) ? 2 : 1); // row + col = (detID / numRx[1]) * ((myDetectorType == EIGER) ? 2 : 1) + + interfaceId; // col for horiz. udp ports + listener->SetHardCodedPosition(row, col); } std::string Implementation::getDetectorHostname() const { @@ -458,8 +441,7 @@ void Implementation::setFileFormat(const fileFormat f) { break; } - for (const auto &it : dataProcessor) - it->SetFileFormat(f); + dataProcessor->SetFileFormat(f); LOG(logINFO) << "File Format: " << sls::ToString(fileFormatType); } @@ -512,12 +494,10 @@ bool Implementation::getFileWriteEnable() const { void Implementation::setFileWriteEnable(const bool b) { if (fileWriteEnable != b) { fileWriteEnable = b; - for (unsigned int i = 0; i < dataProcessor.size(); ++i) { - dataProcessor[i]->SetupFileWriter( - fileWriteEnable, (int *)numDet, &framesPerFile, &fileName, - &filePath, &fileIndex, &overwriteEnable, &detID, &numThreads, - &numberOfTotalFrames, &dynamicRange, &udpPortNum[i], generalData); - } + dataProcessor->SetupFileWriter( + fileWriteEnable, (int *)numRx, &framesPerFile, &fileName, + &filePath, &fileIndex, &overwriteEnable, &detID, &numUDPInterfaces, + &numberOfTotalFrames, &dynamicRange, &udpPortNum, generalData); } LOG(logINFO) << "File Write Enable: " << (fileWriteEnable ? "enabled" : "disabled"); @@ -571,67 +551,40 @@ slsDetectorDefs::runStatus Implementation::getStatus() const { } uint64_t Implementation::getFramesCaught() const { - uint64_t min = -1; - uint32_t flagsum = 0; - - for (const auto &it : dataProcessor) { - flagsum += it->GetStartedFlag(); - uint64_t curr = it->GetNumFramesCaught(); - min = curr < min ? curr : min; - } // no data processed - if (flagsum != dataProcessor.size()) + if (!dataProcessor->GetStartedFlag()) { return 0; - - return min; + } + return dataProcessor->GetNumFramesCaught(); } uint64_t Implementation::getAcquisitionIndex() const { - uint64_t min = -1; - uint32_t flagsum = 0; - - for (const auto &it : dataProcessor) { - flagsum += it->GetStartedFlag(); - uint64_t curr = it->GetCurrentFrameIndex(); - min = curr < min ? curr : min; - } // no data processed - if (flagsum != dataProcessor.size()) + if (!dataProcessor->GetStartedFlag()) { return 0; - - return min; + } + return dataProcessor->GetCurrentFrameIndex(); } int Implementation::getProgress() const { - // get minimum of processed frame indices - uint64_t currentFrameIndex = -1; - uint32_t flagsum = 0; - - for (const auto &it : dataProcessor) { - flagsum += it->GetStartedFlag(); - uint64_t curr = it->GetProcessedIndex(); - currentFrameIndex = curr < currentFrameIndex ? curr : currentFrameIndex; + uint64_t currentFrameIndex = 0; + // data processed + if (dataProcessor->GetStartedFlag()) { + currentFrameIndex = dataProcessor->GetProcessedIndex(); } - // no data processed - if (flagsum != dataProcessor.size()) { - currentFrameIndex = -1; - } - - return (100.00 * ((double)(currentFrameIndex + 1) / (double)numberOfTotalFrames)); + return (100.00 * ((double)(currentFrameIndex) / (double)numberOfTotalFrames)); } -std::vector Implementation::getNumMissingPackets() const { - std::vector mp(numThreads); - for (int i = 0; i < numThreads; i++) { - int np = generalData->packetsPerFrame; - uint64_t totnp = np; - // partial readout - if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) { - totnp = ((numLinesReadout * np) / MAX_EIGER_ROWS_PER_READOUT); - } - totnp *= numberOfTotalFrames; - mp[i] = listener[i]->GetNumMissingPacket(stoppedFlag, totnp); - } +uint64_t Implementation::getNumMissingPackets() const { + uint64_t mp = 0; + int np = generalData->packetsPerFrame; + uint64_t totnp = np; + // partial readout + if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) { + totnp = ((numLinesReadout * np) / MAX_EIGER_ROWS_PER_READOUT); + } + totnp *= numberOfTotalFrames; + mp = listener->GetNumMissingPacket(stoppedFlag, totnp); return mp; } @@ -642,7 +595,7 @@ void Implementation::startReceiver() { ResetParametersforNewAcquisition(); // listener - CreateUDPSockets(); + CreateUDPSocket(); // callbacks if (startAcquisitionCallBack) { @@ -688,69 +641,54 @@ void Implementation::stopReceiver() { bool running = true; while (running) { running = false; - for (const auto &it : listener) - if (it->IsRunning()) - running = true; - - for (const auto &it : dataProcessor) - if (it->IsRunning()) - running = true; + if (listener->IsRunning()) + running = true; + if (dataProcessor->IsRunning()) + running = true; usleep(5000); } // create virtual file - if (fileWriteEnable && fileFormatType == HDF5) { - uint64_t maxIndexCaught = 0; - bool anycaught = false; - for (const auto &it : dataProcessor) { - maxIndexCaught = - std::max(maxIndexCaught, it->GetProcessedIndex()); - if (it->GetStartedFlag()) - anycaught = true; - } + if (fileWriteEnable && fileFormatType == HDF5 && interfaceId == 0) { // to create virtual file & set files/acquisition to 0 (only hdf5 at the // moment) - dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught); + dataProcessor->EndofAcquisition( + dataProcessor->GetStartedFlag(), + dataProcessor->GetProcessedIndex()); } // wait for the processes (dataStreamer) to be done - running = true; - while (running) { - running = false; - for (const auto &it : dataStreamer) - if (it->IsRunning()) - running = true; - usleep(5000); + if (dataStreamEnable) { + running = true; + while (running) { + running = dataStreamer->IsRunning(); + usleep(5000); + } } status = RUN_FINISHED; LOG(logINFO) << "Status: " << sls::ToString(status); { // statistics - std::vector mp = getNumMissingPackets(); - uint64_t tot = 0; - for (int i = 0; i < numThreads; i++) { - int nf = dataProcessor[i]->GetNumFramesCaught(); - tot += nf; + uint64_t mp = getNumMissingPackets(); + uint64_t tot = dataProcessor->GetNumFramesCaught(); - TLogLevel lev = - (((int64_t)mp[i]) > 0) ? logINFORED : logINFOGREEN; - LOG(lev) << - // udp port number could be the second if selected interface is - // 2 for jungfrau - "Summary of Port " << udpPortNum[i] - << "\n\tMissing Packets\t\t: " << mp[i] - << "\n\tComplete Frames\t\t: " << nf - << "\n\tLast Frame Caught\t: " - << listener[i]->GetLastFrameIndexCaught(); - } + TLogLevel lev = + (((int64_t)mp) > 0) ? logINFORED : logINFOGREEN; + LOG(lev) << + // udp port number could be the second if selected interface is + // 2 for jungfrau + "Summary of Port " << udpPortNum + << "\n\tMissing Packets\t\t: " << mp + << "\n\tComplete Frames\t\t: " << tot + << "\n\tLast Frame Caught\t: " + << listener->GetLastFrameIndexCaught(); if (!activated) { LOG(logINFORED) << "Deactivated Receiver"; } // callback if (acquisitionFinishedCallBack) - acquisitionFinishedCallBack((tot / numThreads), - pAcquisitionFinished); + acquisitionFinishedCallBack(tot, pAcquisitionFinished); } // change status @@ -764,14 +702,12 @@ void Implementation::startReadout() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; if (status == RUNNING) { // wait for incoming delayed packets - int totalPacketsReceived = 0; + int totalPacketsReceived = listener->GetPacketsCaught(); int previousValue = -1; - for (const auto &it : listener) - totalPacketsReceived += it->GetPacketsCaught(); // wait for all packets const int numPacketsToReceive = - numberOfTotalFrames * generalData->packetsPerFrame * listener.size(); + numberOfTotalFrames * generalData->packetsPerFrame; if (totalPacketsReceived != numPacketsToReceive) { while (totalPacketsReceived != previousValue) { LOG(logDEBUG3) @@ -780,9 +716,7 @@ void Implementation::startReadout() { << " totalPacketsReceived: " << totalPacketsReceived; usleep(5 * 1000); /* TODO! Need to find optimal time **/ previousValue = totalPacketsReceived; - totalPacketsReceived = 0; - for (const auto &it : listener) - totalPacketsReceived += it->GetPacketsCaught(); + totalPacketsReceived = listener->GetPacketsCaught(); LOG(logDEBUG3) << "\tupdated: totalPacketsReceived:" << totalPacketsReceived; @@ -791,68 +725,58 @@ void Implementation::startReadout() { status = TRANSMITTING; LOG(logINFO) << "Status: Transmitting"; } - // shut down udp sockets to make listeners push dummy (end) packets for + // shut down udp socket to make listeners push dummy (end) packets for // processors - shutDownUDPSockets(); + shutDownUDPSocket(); } -void Implementation::shutDownUDPSockets() { +void Implementation::shutDownUDPSocket() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - for (const auto &it : listener) - it->ShutDownUDPSocket(); + listener->ShutDownUDPSocket(); } void Implementation::closeFiles() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - uint64_t maxIndexCaught = 0; - bool anycaught = false; - for (const auto &it : dataProcessor) { - it->CloseFiles(); - maxIndexCaught = - std::max(maxIndexCaught, it->GetProcessedIndex()); - if (it->GetStartedFlag()) - anycaught = true; - } + dataProcessor->CloseFiles(); // to create virtual file & set files/acquisition to 0 (only hdf5 at the // moment) - dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught); + if (interfaceId == 0) { + dataProcessor->EndofAcquisition( + dataProcessor->GetStartedFlag(), + dataProcessor->GetProcessedIndex()); + } } void Implementation::restreamStop() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - for (const auto &it : dataStreamer) { - it->RestreamStop(); + if (dataStreamEnable) { + dataStreamer->RestreamStop(); + LOG(logINFO) << "Restreaming Dummy Header via ZMQ successful"; } - LOG(logINFO) << "Restreaming Dummy Header via ZMQ successful"; } void Implementation::ResetParametersforNewAcquisition() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - for (const auto &it : listener) - it->ResetParametersforNewAcquisition(); - for (const auto &it : dataProcessor) - it->ResetParametersforNewAcquisition(); + listener->ResetParametersforNewAcquisition(); + dataProcessor->ResetParametersforNewAcquisition(); if (dataStreamEnable) { std::ostringstream os; os << filePath << '/' << fileName; std::string fnametostream = os.str(); - for (const auto &it : dataStreamer) - it->ResetParametersforNewAcquisition(fnametostream); + dataStreamer->ResetParametersforNewAcquisition(fnametostream); } } -void Implementation::CreateUDPSockets() { +void Implementation::CreateUDPSocket() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; try{ - for (unsigned int i = 0; i < listener.size(); ++i) { - listener[i]->CreateUDPSockets(); - } + listener->CreateUDPSocket(); } catch(const sls::RuntimeError &e) { - shutDownUDPSockets(); - throw sls::RuntimeError("Could not create UDP Socket(s)."); + shutDownUDPSocket(); + throw sls::RuntimeError("Could not create UDP Socket."); } LOG(logDEBUG) << "UDP socket(s) created successfully."; @@ -886,11 +810,9 @@ void Implementation::SetupWriter() { } try { - for (unsigned int i = 0; i < dataProcessor.size(); ++i) { - dataProcessor[i]->CreateNewFile(attr); - } + dataProcessor->CreateNewFile(attr); } catch(const sls::RuntimeError &e) { - shutDownUDPSockets(); + shutDownUDPSocket(); closeFiles(); throw sls::RuntimeError("Could not create file."); } @@ -900,17 +822,13 @@ void Implementation::StartRunning() { LOG(logDEBUG3) << __SHORT_AT__ << " called"; // set running mask and post semaphore to start the inner loop in execution // thread - for (const auto &it : listener) { - it->StartRunning(); - it->Continue(); - } - for (const auto &it : dataProcessor) { - it->StartRunning(); - it->Continue(); - } - for (const auto &it : dataStreamer) { - it->StartRunning(); - it->Continue(); + listener->StartRunning(); + listener->Continue(); + dataProcessor->StartRunning(); + dataProcessor->Continue(); + if (dataStreamEnable) { + dataStreamer->StartRunning(); + dataStreamer->Continue(); } } @@ -930,100 +848,12 @@ void Implementation::setNumberofUDPInterfaces(const int n) { if (numUDPInterfaces != n) { - // reduce number of detectors in y dir (rows) if it had 2 interfaces - // before - if (numUDPInterfaces == 2) - numDet[Y] /= 2; - numUDPInterfaces = n; - - // clear all threads and fifos - listener.clear(); - dataProcessor.clear(); - dataStreamer.clear(); - fifo.clear(); - - // set local variables generalData->SetNumberofInterfaces(n); - numThreads = generalData->threadsPerReceiver; udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; - // fifo SetupFifoStructure(); - - // create threads - for (int i = 0; i < numThreads; ++i) { - // listener and dataprocessor threads - try { - auto fifo_ptr = fifo[i].get(); - listener.push_back(sls::make_unique( - i, myDetectorType, fifo_ptr, &status, &udpPortNum[i], - ð[i], &numberOfTotalFrames, &dynamicRange, - &udpSocketBufferSize, &actualUDPSocketBufferSize, - &framesPerFile, &frameDiscardMode, &activated, - &deactivatedPaddingEnable, &silentMode)); - listener[i]->SetGeneralData(generalData); - - dataProcessor.push_back(sls::make_unique( - i, myDetectorType, fifo_ptr, &fileFormatType, - fileWriteEnable, &masterFileWriteEnable, &dataStreamEnable, - &dynamicRange, &streamingFrequency, - &streamingTimerInMs, &framePadding, &activated, - &deactivatedPaddingEnable, &silentMode, &quadEnable, &ctbDbitList, - &ctbDbitOffset, &ctbAnalogDataBytes)); - dataProcessor[i]->SetGeneralData(generalData); - } catch (...) { - listener.clear(); - dataProcessor.clear(); - throw sls::RuntimeError("Could not create listener/dataprocessor threads (index:" + std::to_string(i) + ")"); - } - // streamer threads - if (dataStreamEnable) { - try { - int fd = flippedDataX; - int nd[2] = {numDet[0], numDet[1]}; - if (quadEnable) { - fd = i; - nd[0] = 1; - nd[1] = 2; - } - dataStreamer.push_back(sls::make_unique( - i, fifo[i].get(), &dynamicRange, &roi, &fileIndex, - fd, (int*)nd, &quadEnable, &numberOfTotalFrames)); - dataStreamer[i]->SetGeneralData(generalData); - dataStreamer[i]->CreateZmqSockets( - &numThreads, streamingPort, streamingSrcIP); - dataStreamer[i]->SetAdditionalJsonHeader(additionalJsonHeader); - - } catch (...) { - if (dataStreamEnable) { - dataStreamer.clear(); - dataStreamEnable = false; - } - throw sls::RuntimeError("Could not create datastreamer threads (index:" + std::to_string(i) + ")"); - } - } - } - - SetThreadPriorities(); - - // update (from 1 to 2 interface) & also for printout setDetectorSize(numDet); - // update row and column in dataprocessor - setDetectorPositionId(detID); - - // update call backs - if (rawDataReadyCallBack) { - for (const auto &it : dataProcessor) - it->registerCallBackRawDataReady(rawDataReadyCallBack, - pRawDataReady); - } - if (rawDataModifyReadyCallBack) { - for (const auto &it : dataProcessor) - it->registerCallBackRawDataModifyReady( - rawDataModifyReadyCallBack, pRawDataReady); - } - // test socket buffer size with current set up setUDPSocketBufferSize(0); } @@ -1031,52 +861,40 @@ void Implementation::setNumberofUDPInterfaces(const int n) { LOG(logINFO) << "Number of Interfaces: " << numUDPInterfaces; } +int Implementation::getInterfaceId() const { + LOG(logDEBUG3) << __SHORT_AT__ << " called"; + return interfaceId; +} + +void Implementation::setInterfaceId(const int i) { + LOG(logDEBUG3) << __SHORT_AT__ << " called"; + + interfaceId = i; + LOG(logINFO) << "Interface Id: " << interfaceId; +} + std::string Implementation::getEthernetInterface() const { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - return eth[0]; + return eth; } void Implementation::setEthernetInterface(const std::string &c) { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - eth[0] = c; - LOG(logINFO) << "Ethernet Interface: " << eth[0]; -} - -std::string Implementation::getEthernetInterface2() const { - LOG(logDEBUG3) << __SHORT_AT__ << " called"; - return eth[1]; -} - -void Implementation::setEthernetInterface2(const std::string &c) { - LOG(logDEBUG3) << __SHORT_AT__ << " called"; - - eth[1] = c; - LOG(logINFO) << "Ethernet Interface 2: " << eth[1]; + eth = c; + LOG(logINFO) << "Ethernet Interface: " << eth; } uint32_t Implementation::getUDPPortNumber() const { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - return udpPortNum[0]; + return udpPortNum; } void Implementation::setUDPPortNumber(const uint32_t i) { LOG(logDEBUG3) << __SHORT_AT__ << " called"; - udpPortNum[0] = i; - LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0]; -} - -uint32_t Implementation::getUDPPortNumber2() const { - LOG(logDEBUG3) << __SHORT_AT__ << " called"; - return udpPortNum[1]; -} - -void Implementation::setUDPPortNumber2(const uint32_t i) { - LOG(logDEBUG3) << __SHORT_AT__ << " called"; - - udpPortNum[1] = i; - LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1]; + udpPortNum = i; + LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum; } int64_t Implementation::getUDPSocketBufferSize() const { @@ -1086,15 +904,7 @@ int64_t Implementation::getUDPSocketBufferSize() const { void Implementation::setUDPSocketBufferSize(const int64_t s) { int64_t size = (s == 0) ? udpSocketBufferSize : s; - size_t listSize = listener.size(); - - if (myDetectorType == JUNGFRAU && (int)listSize != numUDPInterfaces) { - throw sls::RuntimeError("Number of Interfaces " + std::to_string(numUDPInterfaces) + " do not match listener size " + std::to_string(listSize)); - } - - for (unsigned int i = 0; i < listSize; ++i) { - listener[i]->CreateDummySocketForUDPSocketBufferSize(size); - } + listener->CreateDummySocketForUDPSocketBufferSize(size); } int64_t Implementation::getActualUDPSocketBufferSize() const { @@ -1119,30 +929,25 @@ void Implementation::setDataStreamEnable(const bool enable) { dataStreamEnable = enable; // data sockets have to be created again as the client ones are - dataStreamer.clear(); + dataStreamer.reset(); if (enable) { - for (int i = 0; i < numThreads; ++i) { - try { - int fd = flippedDataX; - int nd[2] = {numDet[0], numDet[1]}; - if (quadEnable) { - fd = i; - nd[0] = 1; - nd[1] = 2; - } - dataStreamer.push_back(sls::make_unique( - i, fifo[i].get(), &dynamicRange, &roi, &fileIndex, - fd, (int*)nd, &quadEnable, &numberOfTotalFrames)); - dataStreamer[i]->SetGeneralData(generalData); - dataStreamer[i]->CreateZmqSockets( - &numThreads, streamingPort, streamingSrcIP); - dataStreamer[i]->SetAdditionalJsonHeader(additionalJsonHeader); - } catch (...) { - dataStreamer.clear(); - dataStreamEnable = false; - throw sls::RuntimeError("Could not set data stream enable."); + try { + int fd = flippedDataX; + if (quadEnable) { + fd = interfaceId; } + dataStreamer = sls::make_unique( + 0, fifo.get(), &dynamicRange, &roi, &fileIndex, + fd, (int*)numRx, &quadEnable, &numberOfTotalFrames); + dataStreamer->SetGeneralData(generalData); + dataStreamer->CreateZmqSockets( + &numUDPInterfaces, streamingPort, streamingSrcIP); + dataStreamer->SetAdditionalJsonHeader(additionalJsonHeader); + } catch (...) { + dataStreamer.reset(); + dataStreamEnable = false; + throw sls::RuntimeError("Could not set data stream enable."); } SetThreadPriorities(); } @@ -1204,9 +1009,9 @@ std::map Implementation::getAdditionalJsonHeader() con void Implementation::setAdditionalJsonHeader(const std::map &c) { LOG(logDEBUG3) << __SHORT_AT__ << " called"; additionalJsonHeader = c; - for (const auto &it : dataStreamer) { - it->SetAdditionalJsonHeader(c); - } + if (dataStreamEnable) { + dataStreamer->SetAdditionalJsonHeader(c); + } LOG(logINFO) << "Additional JSON Header: " << sls::ToString(additionalJsonHeader); } @@ -1240,9 +1045,9 @@ void Implementation::setAdditionalJsonParameter(const std::string &key, const st additionalJsonHeader[key] = value; LOG(logINFO) << "Adding additional json parameter (" << key << ") to " << value; } - for (const auto &it : dataStreamer) { - it->SetAdditionalJsonHeader(additionalJsonHeader); - } + if (dataStreamEnable) { + dataStreamer->SetAdditionalJsonHeader(additionalJsonHeader); + } LOG(logINFO) << "Additional JSON Header: " << sls::ToString(additionalJsonHeader); } @@ -1409,8 +1214,7 @@ void Implementation::setNumberofAnalogSamples(const uint32_t i) { numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, readoutType); - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } LOG(logINFO) << "Number of Analog Samples: " << numberOfAnalogSamples; @@ -1432,8 +1236,7 @@ void Implementation::setNumberofDigitalSamples(const uint32_t i) { numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, readoutType); - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } LOG(logINFO) << "Number of Digital Samples: " @@ -1454,8 +1257,7 @@ void Implementation::setNumberofCounters(const int i) { if (myDetectorType == MYTHEN3) { generalData->SetNumberofCounters(i, dynamicRange); // to update npixelsx, npixelsy in file writer - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } } @@ -1475,8 +1277,7 @@ void Implementation::setDynamicRange(const uint32_t i) { if (myDetectorType == EIGER || myDetectorType == MYTHEN3) { generalData->SetDynamicRange(i, tengigaEnable); // to update npixelsx, npixelsy in file writer - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); fifoDepth = generalData->defaultFifoDepth; SetupFifoStructure(); } @@ -1497,8 +1298,7 @@ void Implementation::setROI(slsDetectorDefs::ROI arg) { // only for gotthard generalData->SetROI(arg); framesPerFile = generalData->maxFramesPerFile; - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } @@ -1547,17 +1347,13 @@ void Implementation::setFlippedDataX(int enable) { LOG(logDEBUG3) << __SHORT_AT__ << " called"; flippedDataX = (enable == 0) ? 0 : 1; - if (!quadEnable) { - for (const auto &it : dataStreamer) { - it->SetFlippedDataX(flippedDataX); - } - } - else { - if (dataStreamer.size() == 2) { - dataStreamer[0]->SetFlippedDataX(0); - dataStreamer[1]->SetFlippedDataX(1); - } - } + if (dataStreamEnable) { + if (!quadEnable) { + dataStreamer->SetFlippedDataX(flippedDataX); + } else { + dataStreamer->SetFlippedDataX(interfaceId); + } + } LOG(logINFO) << "Flipped Data X: " << flippedDataX; } @@ -1571,21 +1367,16 @@ void Implementation::setQuad(const bool b) { if (quadEnable != b) { quadEnable = b; - if (!quadEnable) { - for (const auto &it : dataStreamer) { - it->SetNumberofDetectors(numDet); - it->SetFlippedDataX(flippedDataX); - } - } else { - int size[2] = {1, 2}; - for (const auto &it : dataStreamer) { - it->SetNumberofDetectors(size); - } - if (dataStreamer.size() == 2) { - dataStreamer[0]->SetFlippedDataX(0); - dataStreamer[1]->SetFlippedDataX(1); - } - } + if (dataStreamEnable) { + if (!quadEnable) { + dataStreamer->SetReceiverShape(numRx); + dataStreamer->SetFlippedDataX(flippedDataX); + } else { + int size[2] = {1, 2}; + dataStreamer->SetReceiverShape(size); + dataStreamer->SetFlippedDataX(interfaceId); + } + } } LOG(logINFO) << "Quad Enable: " << quadEnable; } @@ -1640,8 +1431,7 @@ void Implementation::setReadoutMode(const readoutMode f) { tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, readoutType); - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } @@ -1664,8 +1454,7 @@ void Implementation::setADCEnableMask(uint32_t mask) { numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, readoutType); - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } @@ -1689,8 +1478,7 @@ void Implementation::setTenGigaADCEnableMask(uint32_t mask) { numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, readoutType); - for (const auto &it : dataProcessor) - it->SetPixelDimension(); + dataProcessor->SetPixelDimension(); SetupFifoStructure(); } @@ -1742,16 +1530,14 @@ void Implementation::registerCallBackRawDataReady( void (*func)(char *, char *, uint32_t, void *), void *arg) { rawDataReadyCallBack = func; pRawDataReady = arg; - for (const auto &it : dataProcessor) - it->registerCallBackRawDataReady(rawDataReadyCallBack, pRawDataReady); + dataProcessor->registerCallBackRawDataReady(rawDataReadyCallBack, pRawDataReady); } void Implementation::registerCallBackRawDataModifyReady( void (*func)(char *, char *, uint32_t &, void *), void *arg) { rawDataModifyReadyCallBack = func; pRawDataReady = arg; - for (const auto &it : dataProcessor) - it->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack, + dataProcessor->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack, pRawDataReady); } diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h index ae384dbfe..37f8ada6f 100755 --- a/slsReceiverSoftware/src/Implementation.h +++ b/slsReceiverSoftware/src/Implementation.h @@ -76,12 +76,12 @@ class Implementation : private virtual slsDetectorDefs { uint64_t getFramesCaught() const; uint64_t getAcquisitionIndex() const; int getProgress() const; - std::vector getNumMissingPackets() const; + uint64_t getNumMissingPackets() const; void startReceiver(); void setStoppedFlag(bool stopped); void stopReceiver(); void startReadout(); - void shutDownUDPSockets(); + void shutDownUDPSocket(); void closeFiles(); void restreamStop(); @@ -91,19 +91,15 @@ class Implementation : private virtual slsDetectorDefs { * Network Configuration (UDP) * * * * ************************************************/ + int getInterfaceId() const; + void setInterfaceId(const int value); int getNumberofUDPInterfaces() const; /* [Jungfrau] */ void setNumberofUDPInterfaces(const int n); std::string getEthernetInterface() const; void setEthernetInterface(const std::string &c); - std::string getEthernetInterface2() const; - /* [Jungfrau] */ - void setEthernetInterface2(const std::string &c); uint32_t getUDPPortNumber() const; void setUDPPortNumber(const uint32_t i); - uint32_t getUDPPortNumber2() const; - /* [Eiger][Jungfrau] */ - void setUDPPortNumber2(const uint32_t i); int64_t getUDPSocketBufferSize() const; void setUDPSocketBufferSize(const int64_t s); int64_t getActualUDPSocketBufferSize() const; @@ -226,7 +222,7 @@ class Implementation : private virtual slsDetectorDefs { void SetupFifoStructure(); void ResetParametersforNewAcquisition(); - void CreateUDPSockets(); + void CreateUDPSocket(); void SetupWriter(); void StartRunning(); @@ -238,9 +234,9 @@ class Implementation : private virtual slsDetectorDefs { * ************************************************/ // config parameters - int numThreads; detectorType myDetectorType; int numDet[MAX_DIMENSIONS]; + int numRx[MAX_DIMENSIONS]; int detID; std::string detHostname; bool silentMode; @@ -263,9 +259,10 @@ class Implementation : private virtual slsDetectorDefs { bool stoppedFlag; // network configuration (UDP) + int interfaceId; int numUDPInterfaces; - std::vector eth; - std::vector udpPortNum; + std::string eth; + uint32_t udpPortNum; int64_t udpSocketBufferSize; int64_t actualUDPSocketBufferSize; @@ -318,8 +315,8 @@ class Implementation : private virtual slsDetectorDefs { // class objects GeneralData *generalData; - std::vector> listener; - std::vector> dataProcessor; - std::vector> dataStreamer; - std::vector> fifo; + std::unique_ptr listener; + std::unique_ptr dataProcessor; + std::unique_ptr dataStreamer; + std::unique_ptr fifo; }; diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index ca9761ca3..f7dd2a6bf 100755 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -112,7 +112,7 @@ void Listener::SetGeneralData(GeneralData* g) { } -void Listener::CreateUDPSockets() { +void Listener::CreateUDPSocket() { if (!(*activated)) { return; } diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 3011909a3..a1de4ae23 100755 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -83,9 +83,9 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { void SetGeneralData(GeneralData* g); /** - * Creates UDP Sockets + * Creates UDP Socket */ - void CreateUDPSockets(); + void CreateUDPSocket(); /** * Shuts down and deletes UDP Sockets diff --git a/slsSupportLib/include/ZmqSocket.h b/slsSupportLib/include/ZmqSocket.h index 666c9e7ea..17090cca1 100755 --- a/slsSupportLib/include/ZmqSocket.h +++ b/slsSupportLib/include/ZmqSocket.h @@ -26,10 +26,10 @@ struct zmqHeader { uint32_t jsonversion{0}; uint32_t dynamicRange{0}; uint64_t fileIndex{0}; - /** number of detectors in x axis */ - uint32_t ndetx{0}; - /** number of detectors in y axis */ - uint32_t ndety{0}; + /** number of sockets in x axis */ + uint32_t nSocketX{0}; + /** number of sockets in y axis */ + uint32_t nSocketY{0}; /** number of pixels/channels in x axis for this zmq socket */ uint32_t npixelsx{0}; /** number of pixels/channels in y axis for this zmq socket */ diff --git a/slsSupportLib/include/sls_detector_funcs.h b/slsSupportLib/include/sls_detector_funcs.h index 7113bf51c..219058a43 100755 --- a/slsSupportLib/include/sls_detector_funcs.h +++ b/slsSupportLib/include/sls_detector_funcs.h @@ -285,9 +285,7 @@ enum detFuncs{ F_SET_RECEIVER_QUAD, F_SET_RECEIVER_READ_N_LINES, F_SET_RECEIVER_UDP_IP, - F_SET_RECEIVER_UDP_IP2, F_SET_RECEIVER_UDP_PORT, - F_SET_RECEIVER_UDP_PORT2, F_SET_RECEIVER_NUM_INTERFACES, F_RECEIVER_SET_ADC_MASK_10G, F_RECEIVER_SET_NUM_COUNTERS, @@ -579,9 +577,7 @@ static const char* getFunctionNameFromEnum(enum detFuncs func) { case F_SET_RECEIVER_QUAD: return "F_SET_RECEIVER_QUAD"; case F_SET_RECEIVER_READ_N_LINES: return "F_SET_RECEIVER_READ_N_LINES"; case F_SET_RECEIVER_UDP_IP: return "F_SET_RECEIVER_UDP_IP"; - case F_SET_RECEIVER_UDP_IP2: return "F_SET_RECEIVER_UDP_IP2"; case F_SET_RECEIVER_UDP_PORT: return "F_SET_RECEIVER_UDP_PORT"; - case F_SET_RECEIVER_UDP_PORT2: return "F_SET_RECEIVER_UDP_PORT2"; case F_SET_RECEIVER_NUM_INTERFACES: return "F_SET_RECEIVER_NUM_INTERFACES"; case F_RECEIVER_SET_ADC_MASK_10G: return "F_RECEIVER_SET_ADC_MASK_10G"; case F_RECEIVER_SET_NUM_COUNTERS: return "F_RECEIVER_SET_NUM_COUNTERS"; diff --git a/slsSupportLib/src/ZmqSocket.cpp b/slsSupportLib/src/ZmqSocket.cpp index 2920dd4b2..98d6a6d91 100644 --- a/slsSupportLib/src/ZmqSocket.cpp +++ b/slsSupportLib/src/ZmqSocket.cpp @@ -187,8 +187,8 @@ int ZmqSocket::SendHeader( header.jsonversion, header.dynamicRange, header.fileIndex, - header.ndetx, - header.ndety, + header.nSocketX, + header.nSocketY, header.npixelsx, header.npixelsy, header.imageSize, @@ -319,8 +319,8 @@ int ZmqSocket::ParseHeader(const int index, int length, char *buff, zHeader.data = ((document["data"].GetUint()) == 0) ? false : true; zHeader.dynamicRange = document["bitmode"].GetUint(); zHeader.fileIndex = document["fileIndex"].GetUint64(); - zHeader.ndetx = document["detshape"][0].GetUint(); - zHeader.ndety = document["detshape"][1].GetUint(); + zHeader.nSocketX = document["detshape"][0].GetUint(); + zHeader.nSocketY = document["detshape"][1].GetUint(); zHeader.npixelsx = document["shape"][0].GetUint(); zHeader.npixelsy = document["shape"][1].GetUint(); zHeader.imageSize = document["size"].GetUint();