diff --git a/slsDetector_bin/ju_udp_receiver_3threads_2_0 b/slsDetector_bin/ju_udp_receiver_3threads_2_0 new file mode 100755 index 0000000..ae95630 Binary files /dev/null and b/slsDetector_bin/ju_udp_receiver_3threads_2_0 differ diff --git a/slsDetector_bin/ju_udp_receiver_3threads_new_header.c b/slsDetector_bin/ju_udp_receiver_3threads_new_header.c new file mode 100644 index 0000000..a9c63f8 --- /dev/null +++ b/slsDetector_bin/ju_udp_receiver_3threads_new_header.c @@ -0,0 +1,613 @@ +/**************************************************************************/ +/* This sample program provides a code for a connectionless server. */ +/**************************************************************************/ + +/**************************************************************************/ +/* Header files needed for this sample program */ +/**************************************************************************/ +#include +#include +#include +#include +#include /* POSIX Threads */ +#include +#include /* exit() */ +#include /* memset(), memcpy() */ +#include /* uname() */ +#include +#include /* socket(), bind(), + listen(), accept() */ +#include +#include +#include +#include +#include +#include +#include /* fork(), write(), close() */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define OK 0 +#define FAIL -1 +#define MAX_BUFLEN 1048576 +#define TCP_PORT_NUMBER 2233 + +#define SRVNAME "localhost" + +/**************************************************************************/ +/* Constants used by this program */ +/**************************************************************************/ +//#define SERVER_PORT 50004 +#define SERVER_PORT 32410 +#define SERVER_IP "192.168.12.1" // this is eth0 /eth1 interface for pc8829 +#define BUFFER_LENGTH 4096 +#define FALSE 0 + +/* global (across fork() variable and structs */ + +struct myring{ + uint64_t framenum; + uint64_t bunchid; + uint16_t imagedata[512*1024]; +}; + +struct myring *ringbuffer; +long *putpt; +long *getpt; +int intimeout; + +int ib; +int main(int argc,char *argv[]) +{ + /***********************************************************************/ + /* Variable and structure definitions. */ + /***********************************************************************/ + + +#pragma pack(push) +#pragma pack(2) + + + + struct mystruct{ +/* char emptyheader[6]; */ + uint64_t framenum; + uint32_t exptime; + uint32_t packetnum; + uint64_t bunchid; + uint64_t timestamp; + uint16_t moduleID; + uint16_t xCoord; + uint16_t yCoord; + uint16_t zCoord; + uint32_t debug; + uint16_t roundRobin; + uint8_t detectortype; + uint8_t headerVersion; + uint16_t data[BUFFER_LENGTH]; + }; + struct mystruct packet; + + + +#pragma pack(pop) + + + + struct timeval tss,tsss; //for timing + double tdif; + + int sd=-1, rc1,rc; + FILE * sfilefd=NULL; + int framesinfile=0 ; +#define NPRI 100 + int serverport; + int npri=NPRI; + int npacket=0; + int totalnpacket=0; + int printeach=0; + + float rpratio,aimageration; + + uint16_t chipd[256*256]; + char savefilename[128]; + char serverip[128]; + char savefileprefix[128]; + struct sockaddr_in serveraddr; + struct sockaddr_in clientaddr; + socklen_t clientaddrlen = sizeof(clientaddr); + int isfo=0; + + int fileindex=0; + int chidy; + int chidx; + int ipxoffs; + int idy; + uint16_t *imagept; + int packetframenumold=0; + int packetframenumfirst=-1; + uint64_t packetframenumcurr; + int nframes=0; + int localpt=0; + int chid=8; + int ncompleteframes=0; + int top,bot; + int nosave=0; + intimeout=0; + int maxringsize=0; + int islastpacket=0; +#define A_C_RED "\x1b[31m" +#define A_C_RESET "\x1b[0m" + + if ((argc>2)&&(strcmp(argv[1],"N")==0)) {nosave=1;} + if ((argc>2)&&(strcmp(argv[1],"S")==0)) { top=1;bot=1;} + if ((argc>2)&&(strcmp(argv[1],"ST")==0)) { top=1;bot=0;} + if ((argc>2)&&(strcmp(argv[1],"SB")==0)) { top=0;bot=1;} + if ((argc>2)&&(strcmp(argv[1],"C0")==0)) { top=0;bot=0;chid=0;} + if ((argc>2)&&(strcmp(argv[1],"C1")==0)) { top=0;bot=0;chid=1;} + if ((argc>2)&&(strcmp(argv[1],"C2")==0)) { top=0;bot=0;chid=2;} + if ((argc>2)&&(strcmp(argv[1],"C3")==0)) { top=0;bot=0;chid=3;} + if ((argc>2)&&(strcmp(argv[1],"C4")==0)) { top=0;bot=0;chid=4;} + if ((argc>2)&&(strcmp(argv[1],"C5")==0)) { top=0;bot=0;chid=5;} + if ((argc>2)&&(strcmp(argv[1],"C6")==0)) { top=0;bot=0;chid=6;} + if ((argc>2)&&(strcmp(argv[1],"C7")==0)) { top=0;bot=0;chid=7;} + + int kkk=0; + chidy=(int)(chid/4); + chidx=(chid%4); + ipxoffs=chidy*1024*256; //offset: zero if in the bottom part, half image otherwise + + if ((argc>3)) + { + sprintf(serverip,"%s",argv[3]); + printf("ip is %s \n",serverip); + + + + } + else + { //using hardcoded default SERVER_IP + sprintf(serverip,"%s",SERVER_IP); + printf("ip is %s \n",serverip); + } + + if ((argc>4)) + { + serverport=atoi(argv[4]); + printf("port is %d \n",serverport); + + } + else + { + serverport=SERVER_PORT; + } + + + sprintf((char*)(&savefileprefix),"%s",argv[2]); + +#define RINGSIZE 2048 + + printf( A_C_RED "Using receive ip %s port %d \n"A_C_RESET ,serverip,serverport); + printf("server started with top= %d bot =%d \n",top,bot); + if (nosave!=1) { printf( "saving to %sxxxxxxxx.bin \n",savefileprefix);} else{ printf( "not saving\n"); } + + printf("ring buffer size %d MB (images) \n", RINGSIZE); + + + + ringbuffer = mmap(NULL, sizeof (struct myring)*RINGSIZE, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + + putpt = mmap(NULL, sizeof(long), PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, -1, 0); + getpt = mmap(NULL, sizeof(long), PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_ANONYMOUS, -1, 0); + *getpt=0; + *putpt=0; + + pid_t fid,fid2; + + fid=fork(); + if (fid==-1) perror("fork"); + + + + + if (fid==0) { + + + + struct sched_param Priority_Param; //struct to set priority + int policy=SCHED_FIFO; //kind of policy desired, either SCHED_FIFO or SCHED_RR, + int prioInit = 95; + Priority_Param.sched_priority = prioInit; + sched_setscheduler(0,policy,&Priority_Param); + //printf("PARENT %d %d %d \n", frt, &ringbuffer[1], ringbuffer); + if(errno == EPERM) { perror("the calling thread does not have appropriate privileges"); + perror("Please change /etc/limits.conf and relogin"); + } + + + + sd = socket(AF_INET, SOCK_DGRAM, 0); + if (sd==-1) perror("socket()"); + + +#define SOCKET_BUFFER_SIZE (2000*1024*1024) //2GB + int val=SOCKET_BUFFER_SIZE; + setsockopt(sd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(int)); + + if (sd < 0) { perror("socket() failed"); } + + memset(&serveraddr, 0, sizeof(serveraddr)); + serveraddr.sin_family = AF_INET; + serveraddr.sin_port = htons(serverport); + serveraddr.sin_addr.s_addr = inet_addr(serverip); + + rc = bind(sd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); + if (rc < 0) { perror(A_C_RED" main bind() failed"A_C_RESET); } + + gettimeofday(&tsss,NULL); + + while (1==1) { + + + + rc1 = recvfrom(sd, &packet, sizeof(packet), 0, + (struct sockaddr *)&clientaddr, + &clientaddrlen); + + + //printf(" %d %ld %ld \n", rc1, sizeof(packet) , packet.framenum ); + if ((rc1 < 0)) { perror("recvfrom() failed"); break; } + + npacket++; + totalnpacket++; + + + memcpy((char*)( (char*)(ringbuffer[localpt].imagedata)+((int)(packet.packetnum)*8192) ),packet.data,4096*sizeof(uint16_t)); + if (((int)((packet.packetnum)))==127) {islastpacket=1; }else {islastpacket=0;} + + //printf("receiver %d %ld %ld %d %d \n", localpt, (long) &ringbuffer[localpt] ,(long) ringbuffer[localpt].imagedata ,(int)((packet.packetnum)), 4096*sizeof(uint16_t) ); + + + + + if (islastpacket==1) { + + + + packetframenumcurr= packet.framenum; + + if ((packetframenumfirst==-1)||(packetframenumfirst>packetframenumcurr)){ packetframenumfirst=packetframenumcurr; packetframenumold=packetframenumcurr; } //this is the first packet ever + + nframes++; + + if (npacket==128){ //we assume we have the full image. + ncompleteframes++; + + ringbuffer[localpt].framenum=packetframenumcurr; + ringbuffer[localpt].bunchid=(uint64_t)(packet.debug); + + + + *putpt=*putpt+1; + localpt=(*putpt)%RINGSIZE; + + if ((*putpt-*getpt)>maxringsize) maxringsize=(*putpt-*getpt); //for the printout only. + + while ((*putpt-*getpt)>RINGSIZE-1) { //do not overflow the buffer + usleep(50000) ; + } + + + + + + if ((printeach%npri)==npri-1) { + + tss=tsss; + gettimeofday(&tsss,NULL); + tdif=(1e6*(tsss.tv_sec - tss.tv_sec)+(long)(tsss.tv_usec)-(long)(tss.tv_usec)); + rpratio=(totalnpacket/(float)128)/(float)(packetframenumcurr-packetframenumold); + aimageration=ncompleteframes/(float)(packetframenumcurr-packetframenumfirst); + + npri=(int) ((float)(500000)/(((float)(tdif)/(float)(packetframenumcurr-packetframenumold)))/50.0 + 1) * 50; + //printf("npri = %d %f \n " ,npri,(float)(tdif)/(float)(packetframenumcurr-packetframenumold) ); + if ((rpratio!=1)&&(printeach!=npri-1)) { + printf(A_C_RED "Average hardware rate = %4d Hz, Average Rcv. image ratio= %1.7f Rcv. pck. ratio %1.5f , ringbuf max =%4d/%4d Fr#: %ld \n" A_C_RESET, (int)( 1e6/(tdif/(packetframenumcurr-packetframenumold))), aimageration, rpratio ,maxringsize,RINGSIZE,packetframenumcurr-packetframenumfirst); + } + else { + printf("Average hardware rate = %4d Hz, Average Rcv. image ratio= %1.7f Rcv. pck. ratio %1.5f , ringbuf max =%4d/%4d Fr#: %ld \n", (int)( 1e6/(tdif/(packetframenumcurr-packetframenumold))), aimageration, rpratio ,maxringsize,RINGSIZE,packetframenumcurr-packetframenumfirst); + + printf("timestamp: %lds.%ldns SC= 0x%x 0x%x", (packet.timestamp)/10000000,((packet.timestamp)%10000000)/10, (((packet.debug)>>8)&0xf), packet.headerVersion); + + // printf("SC= 0x%x ", (((packet.debug)>>8)&0xf)); + + // printf("BunchidC= %ld ", (packet.bunchid)); + + } + + packetframenumold=packetframenumcurr; + maxringsize=0; + totalnpacket=0; + + } //end of if print + printeach=printeach+1; + } //end of if (npacket==128) + + + + + + + else + { + // if debug + //printf("npacket= %d,packet.framenum = %ld \n",npacket, (long)packet.framenum ); + + + } + + + + + + + + + npacket=0; + } + } + + + + + } //end of if parent + else + + { //is child (file writer or display server + + fid2=fork(); + + if (fid2==0) { //parent = display server + struct sockaddr_in dyserveraddr; + int dyoptval = 1; + int dyfd = socket(AF_INET,SOCK_STREAM , 0); + if (dyfd==-1) perror("socket()"); + if (setsockopt(dyfd, SOL_SOCKET, SO_REUSEADDR, &dyoptval, sizeof(dyoptval)) == -1) perror("setsockopt"); + + memset(&dyserveraddr, 0, sizeof(dyserveraddr)); + dyserveraddr.sin_family = AF_INET; + dyserveraddr.sin_port = htons(33410); //serverport+1000; //htons(33410); //serverport+1000 + dyserveraddr.sin_addr.s_addr = inet_addr(serverip); + + if (bind(dyfd, (struct sockaddr *)&dyserveraddr , sizeof(dyserveraddr))==-1 ) perror("display bind"); + printf("Im display server, binding port %d \n",dyserveraddr.sin_port);//serverport+1000); + + + + socklen_t dyaddrlen; + struct sockaddr_storage claddr; +#define ADDRSTRLEN (NI_MAXHOST + NI_MAXSERV + 10) + char addrStr[ADDRSTRLEN]; + char host[NI_MAXHOST]; + char service[NI_MAXSERV]; + char request[5]; + dyaddrlen = sizeof(struct sockaddr_storage); + + if (listen(dyfd, 1) == -1) perror("listen"); + + for (;;) { + + int cfd = accept(dyfd, (struct sockaddr *) &claddr, &dyaddrlen); + if (cfd == -1) perror("accept"); + + if (getnameinfo((struct sockaddr *) &claddr, dyaddrlen,host, NI_MAXHOST, service, NI_MAXSERV, 0) == 0) + snprintf(addrStr, ADDRSTRLEN, "(%s, %s)", host, service); + else + snprintf(addrStr, ADDRSTRLEN, "(?UNKNOWN?)"); + + printf("Connection from %s\n", addrStr); + + long frametosendold=0; + long frametosend=0; + + + uint32_t sc_tolook_for = 0; + int count_since_scfound = 0; + + while (1==1){ + + /* Read client request, send image back */ + int nread=read(cfd,&request,5); + if (nread<=0) break; + + // printf("%d bytes read %d # from %s : %s\n", nread, con++,addrStr, request); + + if (strcmp(request,"give\n")==0) { + + + // sc_tolook_for=15-(kkk%3); + // kkk++; + sc_tolook_for=13; + while (frametosend<=frametosendold){ + usleep(20000); //TBC + for (ib=1;ib<=15;ib++) { + + localpt=((*putpt)-(long)(ib))%RINGSIZE; + //if ((((ringbuffer[localpt].bunchid)>>8)&0xf)==2){ frametosend=(*putpt)-ib; break;} + // replaced with 'timeout' + if ((count_since_scfound > 100 )) { // tunable + printf("Receiver to online disp: going to look for sc15 any SC instead of SCX\n"); + intimeout=1; + + } + + + if (((((ringbuffer[localpt].bunchid)>>8)&0xf)==sc_tolook_for)|(intimeout==1)) { + + + if ((((ringbuffer[localpt].bunchid)>>8)&0xf)==sc_tolook_for){ + intimeout=0; + count_since_scfound=0; + } + + + + frametosend=(*putpt)-ib; + // printf("sending sc %ld \n", ((ringbuffer[localpt].bunchid)>>8)&0xf ); + break; + + // frametosend=(*putpt)-ib; break; + + + } else { + count_since_scfound++; + + } + // end of 'timeout' modification + + } + + + + + } + localpt=frametosend%RINGSIZE; + // write(cfd, &request,5); + + int wr1=0; + + + while (wr1+write(cfd, &(ringbuffer[localpt].imagedata[(int)(wr1/2)]), 2*1024* 512-wr1)<1024*512*2) { ;} + + + printf("sent image with SC %d \n ",(int)(((ringbuffer[localpt].bunchid)>>8)&0xf)); + + // if (frametosend>10000) {close(cfd);close(sd); } + + + + frametosendold=frametosend; + + } + + + + }//end of while 1==1 + + if (close(cfd) == -1) /* Close connection */ + perror("close"); + + } //end of for accept + + } + + + + + else + { //child child file writer + + + struct sched_param Priority_Param; //struct to set priority + int policy=SCHED_FIFO; //kind of policy desired, either SCHED_FIFO or SCHED_RR, + int prioInit = 45; + Priority_Param.sched_priority = prioInit; + sched_setscheduler(0,policy,&Priority_Param); + //printf("PARENT %d %d %d \n", frt, &ringbuffer[1], ringbuffer); + if(errno == EPERM) { perror("the calling thread does not have appropriate privileges"); + perror("Please change /etc/limits.conf and relogin"); + } + + + while (1==1){ + + while ((*putpt-*getpt)<1) { + + usleep(50000); + if (isfo==1) fflush(sfilefd); + + } + + if ((framesinfile%10000)==0){ + + if (isfo==1) {fileindex++; fclose(sfilefd);isfo=0;}; + sprintf(savefilename,"%s%06d.dat",savefileprefix,fileindex); + + + if (nosave!=1) { + + sfilefd=fopen((const char *)(savefilename),"w"); + if (sfilefd!=NULL) {isfo=1; } + else + { perror("file opening not succesfull");} + printf("saving to ... %s \n",savefilename); + } + + framesinfile=0; + usleep(10000); + } + + + if (nosave!=1) fwrite((char*)(&ringbuffer[localpt]), 1 ,16 ,sfilefd); //this writes the header + imagept=(uint16_t*)((char*)(ringbuffer[localpt].imagedata)); + + + //printf("writer %d %ld \n", localpt, (long) imagept ); + + + if ((top==1)||(bot==1)) + { + if (nosave!=1) fwrite(imagept+(1-bot)*1024*256, 2 ,4096*64*(top+bot) ,sfilefd); + } + else //end of else if ((top==1)||(bot==1))single chip + { + chidy=(int)(chid/4); + chidx=(chid%4); + ipxoffs=chidy*1024*256; //offset: zero if in the bottom part + for (idy=0;idy<256;idy++){ + memcpy(&chipd[idy*256],&imagept[ipxoffs+idy*1024+chidx*256],256*sizeof(uint16_t)); + }// end of for idy + if (nosave!=1) fwrite(&chipd, 256*256 ,sizeof(uint16_t),sfilefd); + } //end of else if ((top==1)||(bot==1)) + + *getpt= *getpt+1; + localpt=(*getpt)%RINGSIZE; + framesinfile++; + ncompleteframes++; + printeach=printeach+1; + + + } + } //end of is child file writer + + + }//end of is child file writer or display server + + + /***********************************************************************/ + /* Close down any open socket descriptors */ + /***********************************************************************/ + if (sd != -1) + close(sd); + + return 0; +}