/*-------------------------------------------------------------------------- * A module which reads the broadcast messages from the PSI Accelerator group * and allows to use the information in them in SICS. This facility runs * as a SICS task which tries to read incoming messages and stores the * messages of interest in an internal data structure. Some code is provided * to make the content of these messages available within SICS. This code * follows very closely the DoesItComeMC program from Demir Anici * * The module also holds a circular backlog of the last MAXLOG measurements. * This is used in order to calulcate an average. * * copyright: see file COPYRIGHT * * Mark Koennecke, July 2005 * * Modified to also hold the code for a second version which * uses the redirector on lnsl15. This has to be used by instruments in * the private network as they cannot receive the ACS broadacst. * * Mark Koennecke, July 2010 */ #include #include #include #include #include #include #include #include #include #include "dgrambroadcast.h" #include "sinq.h" #include #ifdef SEND_PORT #define RECEIVE_PORT SEND_PORT #else #define RECEIVE_PORT 0xABCC #endif #define MAX_BLEN 2048 /*====================== life and death =====================================*/ static int SinqTask(void *data) { pSinq self = (pSinq) data; char buff[MAX_BLEN]; int status, sinq; if (self == NULL) { return 0; } status = selectReceiveSocket(self->receiveSocket, 0); if (status <= 0) { /* * no pending message */ return 1; } memset(buff, 0, MAX_BLEN); status = read(self->receiveSocket, buff, MAX_BLEN); if (status < 0) { ServerWriteGlobal("WARNING: failed to read Sinq Status", eWarning); return 1; } if (memcmp(buff, "D110", 4) == 0) { strcpy(self->d110, buff); sinq = getSinqBeam(self, SINQBEAM); self->lastSinq[self->lastCount] = sinq; self->lastCount++; if (self->lastCount >= MAXLOG) { self->lastCount = 0; } } if (memcmp(buff, "A110", 4) == 0) { strcpy(self->a110, buff); } /* * ignore any other message */ return 1; } /*------------------------------------------------------------------------*/ static void KillSinq(void *data) { pSinq self = (pSinq) data; if (self == NULL) { return; } if (self->pDes != NULL) { DeleteDescriptor(self->pDes); } if(self->anet){ ANETclose(self->receiveSocket); } free(self); } /*-------------------------------------------------------------------------*/ int SinqFactory(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { pSinq pNew = NULL; int i; pNew = (pSinq) malloc(sizeof(Sinq)); if (pNew == NULL) { SCWrite(pCon, "ERROR: out of memory allocating Sinq", eError); return 0; } memset(pNew, 0, sizeof(Sinq)); pNew->pDes = CreateDescriptor("Sinq"); if (pNew->pDes == NULL) { SCWrite(pCon, "ERROR: out of memory allocating Sinq", eError); free(pNew); return 0; } pNew->receiveSocket = openReceiveSocket(RECEIVE_PORT); if (pNew->receiveSocket < 0) { SCWrite(pCon, "ERROR: failed to open Sinq Status Broadcast port", eError); KillSinq(pNew); return 0; } for (i = 0; i < MAXLOG; i++) { pNew->lastSinq[i] = -200; } TaskRegisterN(pServ->pTasker,"sinq", SinqTask, NULL, NULL, pNew, TASK_PRIO_LOW); return AddCommand(pSics, "sinq", SinqWrapper, KillSinq, pNew); } /*===================== actual Action ====================================*/ extern char *trim(char *txt); /*-----------------------------------------------------------------------*/ int getSinqBeam(pSinq self, int code) { int result, i; char *pPtr; if (self == NULL) { return -900; } switch (code) { case SINQBEAM: pPtr = strstr(&self->d110[4], "MHC6"); break; case RINGBEAM: pPtr = strstr(&self->d110[4], "MHC3"); break; default: return -900; } if (pPtr == NULL) { printf("Invalid Sinq message: %s\n", &self->d110[4]); return -900; } pPtr = strstr(pPtr, ":"); if (pPtr == NULL) { return -900; } pPtr++; /* * zero out units */ for (i = strlen(pPtr); i > 0; i--) { if (isdigit(pPtr[i])) { break; } else { pPtr[i] = '\0'; } } return atoi(trim(pPtr)); } /*-------------------------------------------------------------------------*/ int SinqWrapper(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { pSinq self = (pSinq) pData; char pBueffel[132]; int sum, count, i, avg; assert(self != NULL); if (argc < 2) { SCWrite(pCon, &self->a110[4], eValue); return 1; } strtolower(argv[1]); if (strcmp(argv[1], "beam") == 0) { snprintf(pBueffel, 131, "sinq.beam = %d", getSinqBeam(self, SINQBEAM)); } else if (strcmp(argv[1], "ring") == 0) { snprintf(pBueffel, 131, "sinq.ring = %d", getSinqBeam(self, RINGBEAM)); } else if (strcmp(argv[1], "beamavg") == 0) { for (i = 0, count = 0, sum = 0; i < MAXLOG; i++) { if (self->lastSinq[i] > -50) { count++; sum += self->lastSinq[i]; } } if (count > 0) { avg = sum / count; } else { avg = 0; } snprintf(pBueffel, 131, "sinq.beamavg = %d", avg); } else { SCWrite(pCon, "ERROR: invalid key, I understand: beam, ring", eError); return 0; } SCWrite(pCon, pBueffel, eValue); return 1; } /*-------------------------------------------------------------------------*/ static char* searchMessage(char *pPtr, int length) { int i; for(i = 0; i < length; i++){ if(pPtr[i] == (char) 4 && pPtr[i+1] == (char)4){ return pPtr + i-1; } } return NULL; } /*-------------------------------------------------------------------------*/ static int SINQRedirectCallback(int handle, void *userData){ pSinq self = (pSinq)userData; char *pPtr = NULL, *pTerm = NULL,pID[5]; int length, sinq; pPtr = ANETreadPtr(handle, &length); pTerm = searchMessage(pPtr,length); while(pTerm != NULL){ /* strlcpy(pID, pPtr, 5); printf("Received message with ID: %s\n", pID); */ if (memcmp(pPtr, "D110", 4) == 0) { strlcpy(self->d110, pPtr, pTerm - pPtr); sinq = getSinqBeam(self, SINQBEAM); self->lastSinq[self->lastCount] = sinq; self->lastCount++; if (self->lastCount >= MAXLOG) { self->lastCount = 0; } } if (memcmp(pPtr, "A110", 4) == 0) { strlcpy(self->a110, pPtr, pTerm - pPtr); } ANETreadConsume(handle, (pTerm - pPtr) + 3); pPtr = ANETreadPtr(handle, &length); pTerm = searchMessage(pPtr,length); } return 1; } /*-------------------------------------------------------------------------*/ int SinqRedirectFactory(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { pSinq pNew = NULL; int i; if(argc < 3){ SCWrite(pCon,"ERROR: need host port argument to SinqRedirectFactory", eError); return 0; } pNew = (pSinq) malloc(sizeof(Sinq)); if (pNew == NULL) { SCWrite(pCon, "ERROR: out of memory allocating Sinq", eError); return 0; } memset(pNew, 0, sizeof(Sinq)); pNew->pDes = CreateDescriptor("Sinq"); if (pNew->pDes == NULL) { SCWrite(pCon, "ERROR: out of memory allocating Sinq", eError); free(pNew); return 0; } pNew->receiveSocket = ANETconnect(argv[1],atoi(argv[2])); if (pNew->receiveSocket < 0) { SCWrite(pCon, "ERROR: failed to open Sinq Status port", eError); KillSinq(pNew); return 0; } ANETsetReadCallback(pNew->receiveSocket, SINQRedirectCallback, pNew, NULL); pNew->anet = 1; for (i = 0; i < MAXLOG; i++) { pNew->lastSinq[i] = -200; } return AddCommand(pSics, "sinq", SinqWrapper, KillSinq, pNew); }