slsReceiver:trying to get deactivate to work

This commit is contained in:
2018-08-14 15:06:06 +02:00
parent fa175ac934
commit f0ac49190a
18 changed files with 348 additions and 90 deletions

View File

@@ -27,7 +27,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
fileFormat* ftype, bool fwenable,
bool* dsEnable, bool* gpEnable, uint32_t* dr,
uint32_t* freq, uint32_t* timer,
bool* fp,
bool* fp, bool* act, bool* depaden,
void (*dataReadycb)(char*, char*, uint32_t, void*),
void (*dataModifyReadycb)(char*, char*, uint32_t &, void*),
void *pDataReadycb) :
@@ -48,6 +48,8 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo*& f,
currentFreqCount(0),
tempBuffer(0),
xcoordin1D(0),
activated(act),
deactivatedPaddingEnable(depaden),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
@@ -374,8 +376,10 @@ void DataProcessor::ProcessAnImage(char* buf) {
header.xCoord = xcoordin1D;
}
// frame padding
if (*framePadding && nump < generalData->packetsPerFrame)
// deactivated and padding enabled
if ((!(*activated) && *deactivatedPaddingEnable) ||
// frame padding
(*framePadding && nump < generalData->packetsPerFrame))
PadMissingPackets(buf);
// normal call back

View File

@@ -22,7 +22,7 @@ const string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
uint32_t* portno, char* e, uint64_t* nf, uint32_t* dr,
uint32_t* us, uint32_t* as, uint32_t* fpf,
frameDiscardPolicy* fdp) :
frameDiscardPolicy* fdp, bool* act, bool* depaden) :
ThreadObject(ind),
runningFlag(0),
generalData(0),
@@ -38,6 +38,8 @@ Listener::Listener(int ind, detectorType dtype, Fifo*& f, runStatus* s,
actualUDPSocketBufferSize(as),
framesPerFile(fpf),
frameDiscardMode(fdp),
activated(act),
deactivatedPaddingEnable(depaden),
acquisitionStartedFlag(false),
measurementStartedFlag(false),
firstAcquisitionIndex(0),
@@ -183,6 +185,10 @@ int Listener::SetThreadPriority(int priority) {
int Listener::CreateUDPSockets() {
if (!(*activated)) {
return OK;
}
//if eth is mistaken with ip address
if (strchr(eth,'.') != NULL){
memset(eth, 0, MAX_STR_LENGTH);
@@ -236,6 +242,12 @@ void Listener::SetSilentMode(bool mode) {
int Listener::CreateDummySocketForUDPSocketBufferSize(uint32_t s) {
FILE_LOG(logINFO) << "Testing UDP Socket Buffer size with test port " << *udpPortNumber;
if (!(*activated)) {
*actualUDPSocketBufferSize = (s*2);
return OK;
}
uint32_t temp = *udpSocketBufferSize;
*udpSocketBufferSize = s;
@@ -291,7 +303,7 @@ void Listener::ThreadExecution() {
#endif
//udpsocket doesnt exist
if (!udpSocketAlive && !carryOverFlag) {
if (*activated && !udpSocketAlive && !carryOverFlag) {
//FILE_LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not created or shut down earlier";
(*((uint32_t*)buffer)) = 0;
StopListening(buffer);
@@ -299,7 +311,7 @@ void Listener::ThreadExecution() {
}
//get data
if ((*status != TRANSMITTING && udpSocketAlive) || carryOverFlag) {
if ((*status != TRANSMITTING && (!(*activated) || udpSocketAlive)) || carryOverFlag) {
rc = ListenToAnImage(buffer);
}
@@ -357,6 +369,7 @@ void Listener::StopListening(char* buf) {
/* buf includes the fifo header and packet header */
uint32_t Listener::ListenToAnImage(char* buf) {
int rc = 0;
uint64_t fnum = 0, bid = 0;
uint32_t pnum = 0, snum = 0;
@@ -378,6 +391,20 @@ uint32_t Listener::ListenToAnImage(char* buf) {
/*memset(buf + fifohsize, 0xFF, generalData->imageSize);*/
new_header = (sls_receiver_header*) (buf + FIFO_HEADER_NUMBYTES);
// deactivated (eiger)
if (!(*activated)) {cprintf(RED,"deactivated receiver\n");
// no padding
if (!(*deactivatedPaddingEnable)) {
return 0;}
// padding without setting bitmask (all missing packets padded in dataProcessor)
if (currentFrameIndex >= *numImages)
return 0;
new_header->detHeader.frameNumber = currentFrameIndex+1; //(eiger)
new_header->detHeader.detType = (uint8_t) generalData->myDetectorType;
new_header->detHeader.version = (uint8_t) SLS_DETECTOR_HEADER_VERSION;
return generalData->imageSize;
}
//look for carry over

View File

@@ -58,6 +58,7 @@ void UDPBaseImplementation::initializeMembers(){
//***receiver parameters***
status = IDLE;
activated = true;
deactivatedPaddingEnable = true;
frameDiscardMode = NO_DISCARD;
framePadding = false;
@@ -322,11 +323,16 @@ uint32_t UDPBaseImplementation::getSilentMode() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return silentMode;}
int UDPBaseImplementation::getActivate() const{
bool UDPBaseImplementation::getActivate() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return activated;
}
bool UDPBaseImplementation::getDeactivatedPadding() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return deactivatedPaddingEnable;
}
uint32_t UDPBaseImplementation::getStreamingPort() const{
FILE_LOG(logDEBUG) << __AT__ << " starting";
return streamingPort;
@@ -733,17 +739,20 @@ void UDPBaseImplementation::abort(){
}
int UDPBaseImplementation::setActivate(int enable){
bool UDPBaseImplementation::setActivate(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
if(enable != -1){
activated = enable;
FILE_LOG(logINFO) << "Activation: " << stringEnable(activated);
}
activated = enable;
FILE_LOG(logINFO) << "Activation: " << stringEnable(activated);
return activated;
}
bool UDPBaseImplementation::setDeactivatedPadding(bool enable){
FILE_LOG(logDEBUG) << __AT__ << " starting";
deactivatedPaddingEnable = enable;
FILE_LOG(logINFO) << "Deactivated Padding Enable: " << stringEnable(deactivatedPaddingEnable);
return deactivatedPaddingEnable;
}
void UDPBaseImplementation::setStreamingPort(const uint32_t i) {
streamingPort = i;

View File

@@ -375,13 +375,13 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) {
Listener* l = new Listener(i, myDetectorType, fifo[i], &status,
&udpPortNum[i], eth, &numberOfFrames, &dynamicRange,
&udpSocketBufferSize, &actualUDPSocketBufferSize, &framesPerFile,
&frameDiscardMode);
&frameDiscardMode, &activated, &deactivatedPaddingEnable);
listener.push_back(l);
DataProcessor* p = new DataProcessor(i, myDetectorType, fifo[i], &fileFormatType,
fileWriteEnable, &dataStreamEnable, &gapPixelsEnable,
&dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS,
&framePadding,
&framePadding, &activated, &deactivatedPaddingEnable,
rawDataReadyCallBack, rawDataModifyReadyCallBack, pRawDataReady);
dataProcessor.push_back(p);
}
@@ -573,39 +573,37 @@ void UDPStandardImplementation::stopReceiver(){
void UDPStandardImplementation::startReadout(){
if(status == RUNNING){
//needs to wait for packets only if activated
if(activated){
// wait for incoming delayed packets
//current packets caught
volatile int totalP = 0,prev=-1;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
//current packets caught
volatile int totalP = 0,prev=-1;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
//wait for all packets
if((unsigned long long int)totalP!=numberOfFrames*generalData->packetsPerFrame*listener.size()){
//wait as long as there is change from prev totalP,
while(prev != totalP){
//wait as long as there is change from prev totalP,
while(prev != totalP){
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d\n",
prev,totalP);
cprintf(MAGENTA,"waiting for all packets prevP:%d totalP:%d\n",
prev,totalP);
#endif
//usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);
usleep(5*1000);/* Need to find optimal time **/
//usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);usleep(1*1000*1000);
usleep(5*1000);/* Need to find optimal time **/
prev = totalP;
totalP = 0;
prev = totalP;
totalP = 0;
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
for (vector<Listener*>::const_iterator it = listener.begin(); it != listener.end(); ++it)
totalP += (*it)->GetPacketsCaught();
#ifdef VERY_VERBOSE
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
cprintf(MAGENTA,"\tupdated: totalP:%d\n",totalP);
#endif
}
}
}
//set status
status = TRANSMITTING;
FILE_LOG(logINFO) << "Status: Transmitting";

View File

@@ -245,6 +245,7 @@ const char* slsReceiverTCPIPInterface::getFunctionName(enum recFuncs func) {
case F_RECEIVER_CHECK_VERSION: return "F_RECEIVER_CHECK_VERSION";
case F_RECEIVER_DISCARD_POLICY: return "F_RECEIVER_DISCARD_POLICY";
case F_RECEIVER_PADDING_ENABLE: return "F_RECEIVER_PADDING_ENABLE";
case F_RECEIVER_DEACTIVATED_PADDING_ENABLE: return "F_RECEIVER_DEACTIVATED_PADDING_ENABLE";
default: return "Unknown Function";
}
@@ -301,6 +302,8 @@ int slsReceiverTCPIPInterface::function_table(){
flist[F_RECEIVER_CHECK_VERSION] = &slsReceiverTCPIPInterface::check_version_compatibility;
flist[F_RECEIVER_DISCARD_POLICY] = &slsReceiverTCPIPInterface::set_discard_policy;
flist[F_RECEIVER_PADDING_ENABLE] = &slsReceiverTCPIPInterface::set_padding_enable;
flist[F_RECEIVER_DEACTIVATED_PADDING_ENABLE] = &slsReceiverTCPIPInterface::set_deactivated_receiver_padding_enable;
#ifdef VERYVERBOSE
for (int i = 0; i < NUM_REC_FUNCTIONS ; i++) {
@@ -706,6 +709,19 @@ int slsReceiverTCPIPInterface::send_update() {
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// activate
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getActivate();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
// deactivated padding enable
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
ind=(int)receiverBase->getDeactivatedPadding();
#endif
n += mySock->SendDataOnly(&ind,sizeof(ind));
if (!lockStatus)
strcpy(mySock->lastClientIP,mySock->thisClientIP);
@@ -1931,11 +1947,11 @@ int slsReceiverTCPIPInterface::set_activate() {
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setActivate(enable);
receiverBase->setActivate(enable > 0 ? true : false);
}
}
//get
retval = receiverBase->getActivate();
retval = (int)receiverBase->getActivate();
if(enable >= 0 && retval != enable){
ret = FAIL;
sprintf(mess,"Could not set activate to %d, returned %d\n",enable,retval);
@@ -2485,7 +2501,7 @@ int slsReceiverTCPIPInterface::enable_gap_pixels() {
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "Activate: " << retval;
FILE_LOG(logDEBUG1) << "Gap Pixels Enable: " << retval;
#endif
if (ret == OK && mySock->differentClients)
@@ -2895,3 +2911,59 @@ int slsReceiverTCPIPInterface::set_padding_enable() {
int slsReceiverTCPIPInterface::set_deactivated_receiver_padding_enable() {
ret = OK;
memset(mess, 0, sizeof(mess));
int enable = -1;
int retval = -1;
// receive arguments
if (mySock->ReceiveDataOnly(&enable,sizeof(enable)) < 0 )
return printSocketReadError();
if (myDetectorType != EIGER)
functionNotImplemented();
// execute action
#ifdef SLS_RECEIVER_UDP_FUNCTIONS
else {
if (receiverBase == NULL)
invalidReceiverObject();
else {
// set
if(enable >= 0) {
if (mySock->differentClients && lockStatus)
receiverlocked();
else if (receiverBase->getStatus() != IDLE)
receiverNotIdle();
else {
receiverBase->setDeactivatedPadding(enable > 0 ? true : false);
}
}
//get
retval = (int)receiverBase->getDeactivatedPadding();
if(enable >= 0 && retval != enable){
ret = FAIL;
sprintf(mess,"Could not set deactivated padding enable to %d, returned %d\n",enable,retval);
FILE_LOG(logERROR) << mess;
}
}
}
#endif
#ifdef VERYVERBOSE
FILE_LOG(logDEBUG1) << "Deactivated Padding Enable: " << retval;
#endif
if (ret == OK && mySock->differentClients)
ret = FORCE_UPDATE;
// send answer
mySock->SendDataOnly(&ret,sizeof(ret));
if (ret == FAIL)
mySock->SendDataOnly(mess,sizeof(mess));
mySock->SendDataOnly(&retval,sizeof(retval));
// return ok/fail
return ret;
}