diff --git a/asynnet.c b/asynnet.c index 46592d9b..7b707e61 100644 --- a/asynnet.c +++ b/asynnet.c @@ -581,6 +581,7 @@ void *ANETreadPtr(int handle, int *length) con = findSocketDescriptor(handle); if (con == NULL) { + *length = 0; return NULL; } else { data = GetRWBufferData(con->readBuffer, length); diff --git a/conman.c b/conman.c index a1d62e34..13a4dd2b 100644 --- a/conman.c +++ b/conman.c @@ -74,6 +74,15 @@ #include "sicshipadaba.h" #include "protocol.h" #include "sicsvar.h" +#include + +/* + Greetings from protocol.c for SCLogWrite... +*/ +extern struct json_object *mkJSON_Object(SConnection * pCon, char *pBuffer, + int iOut); + + /* #define UUDEB 1 define UUDEB , for buffer writing for checking encoding */ @@ -255,6 +264,7 @@ static SConnection *CreateConnection(SicsInterp * pSics) pRes->conStart = time(NULL); pRes->write = SCNormalWrite; pRes->runLevel = RUNDRIVE; + pRes->remote = 0; /* initialise context variables */ pRes->iCmdCtr = 0; @@ -487,6 +497,7 @@ SConnection *SCCopyConnection(SConnection * pCon) result->iList = -1; result->runLevel = pCon->runLevel; result->data = pCon->data; + result->remote = pCon->remote; return result; } @@ -1080,13 +1091,14 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) { char pBueffel[1024]; char *pPtr; + json_object *myJson = NULL; /* for commandlog tail */ if (!VerifyConnection(self)) { return 0; } - if(self->iProtocolID == 5) { + if(self->iProtocolID == PROTACT) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1098,6 +1110,12 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == PROTJSON) { + myJson = mkJSON_Object(self,buffer,iOut); + if(myJson != NULL){ + SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); + json_object_put(myJson); + } } else { testAndWriteSocket(self, buffer, iOut); } @@ -1111,6 +1129,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) { char pBueffel[1024]; char *pPtr; + json_object *myJson = NULL; if (!VerifyConnection(self)) { return 0; @@ -1119,7 +1138,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) WriteToCommandLogId(NULL, self->sockHandle, buffer); SetSendingConnection(NULL); - if(self->iProtocolID == 5) { + if(self->iProtocolID == PROTACT) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1131,7 +1150,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } - } else if(self->iProtocolID == 2) { + } else if(self->iProtocolID == PROTCODE) { /* withcode */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1143,6 +1162,12 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == PROTJSON) { /* json */ + myJson = mkJSON_Object(self,buffer,iOut); + if(myJson != NULL){ + SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); + json_object_put(myJson); + } } else { testAndWriteSocket(self, buffer, iOut); } @@ -1323,7 +1348,7 @@ int SCWriteZipped(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 5) { + if (protocolID == PROTACT) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName, compressedLength, cc.transID); @@ -1398,7 +1423,7 @@ int SCWriteBinary(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 5) { + if (protocolID == PROTACT) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN BIN %s %d %d\r\n", pName, iDataLen, cc.transID); @@ -1504,7 +1529,7 @@ int SCWriteZippedOld(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 5) { + if (protocolID == PROTACT) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName, compressedLength, cc.transID); @@ -1830,7 +1855,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) memset(pBueffel, 0, 80); stptok(trim(pCommand), pBueffel, 79, " "); self->iCmdCtr++; - if (999999 < self->iCmdCtr) { + if (self->iCmdCtr > 99998) { self->iCmdCtr = 0; } self->transID = self->iCmdCtr; @@ -1856,6 +1881,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) config File Filename Logs to another file config output normal | withcode | ACT Sets output mode config listen 0 | 1 enables commandlog listen mode + config remote sets the remote connection flag ---------------------------------------------------------------------------*/ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, @@ -1913,7 +1939,11 @@ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, SCSendOK(pCon); return 1; } - } + } else if(strcmp(argv[1],"remote") == 0) { + pMaster->remote = 1; + pCon->remote = 1; + return 1; + } /* check no or args */ if (argc < 3) { diff --git a/conman.h b/conman.h index 9056bde7..85c4f319 100644 --- a/conman.h +++ b/conman.h @@ -71,6 +71,7 @@ typedef struct __SConnection { pCosta pStack; /* stack of pending commands */ int contextStack; /* context stack: may go? */ mkChannel *pSock; /* for temporary backwards compatability */ + int remote; /* true if this is a remote object connection */ } SConnection; #include "nserver.h" diff --git a/countersec.c b/countersec.c index e393786b..2ec4f3e8 100644 --- a/countersec.c +++ b/countersec.c @@ -184,13 +184,17 @@ static int SecCtrCheckStatus(void *pData, SConnection *pCon) fControl = v.v.doubleValue; } else { node = GetHipadabaNode(self->pDes->parNode,"values"); - assert(node != NULL); - /* - The 1 below is only correct for PSI where only the first - monitor can be the control monitor. Elsewhere this must be the - control monitor channel - */ - fControl = v.v.intArray[1]; + if(node != NULL) { + /* + This can be NULL if the counter is a HM. The values does not + exist and fControl is useless + + The 1 below is only correct for PSI where only the first + monitor can be the control monitor. Elsewhere this must be the + control monitor channel + */ + fControl = v.v.intArray[1]; + } } diff --git a/devser.c b/devser.c index fc1f90b2..bb6a7a92 100644 --- a/devser.c +++ b/devser.c @@ -151,9 +151,6 @@ static DevAction *DevNextAction(DevSer * devser) } static void LogStart(DevSer *self) { - if(self->startTime > 0){ - printf("DEVSER: there is something fucked up in LogStart. Investigate!\n"); - } self->startTime = DoubleTime(); } static void LogResponse(DevSer *self, int error) diff --git a/fourmess.c b/fourmess.c index c497e21e..ac72764a 100644 --- a/fourmess.c +++ b/fourmess.c @@ -411,6 +411,7 @@ static int FourMessStoreIntern(pSICSOBJ self, SConnection * pCon, SCWrite(pCon, "ERROR: store: no files open", eLogError); return 0; } + priv->count++; /* get necessary data */ fSum = 0.; @@ -497,7 +498,7 @@ static int FourMessStoreIntern(pSICSOBJ self, SConnection * pCon, fPreset, fTemp, prot, pBueffel); } else { fprintf(priv->profFile, "%3d %7.4f %9.0f %7.3f %12f %s %s\n", iNP, fStep, - fPreset, fTemp, prot, extra, pBueffel); + fPreset, fTemp, prot, pBueffel,extra); } for (i = 0; i < iNP; i++) { for (ii = 0; ii < 10 && i < iNP; ii++) { diff --git a/interface.c b/interface.c index d661b2c3..6f84a604 100644 --- a/interface.c +++ b/interface.c @@ -242,6 +242,9 @@ static int DriveTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); + } return 0; } /*--------------------------------------------------------------------------*/ @@ -272,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget) ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); InvokeNewTarget(pServ->pExecutor,name,fTarget); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = DRIVEID; taskData->obj = obj; @@ -393,6 +399,9 @@ static int CountTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("count","CountTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); + } return 0; } /*--------------------------------------------------------------------------*/ @@ -418,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name) } ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = COUNTID; taskData->obj = obj; diff --git a/motorsec.c b/motorsec.c index 6f0ed33a..723326f7 100644 --- a/motorsec.c +++ b/motorsec.c @@ -298,7 +298,7 @@ static int SecMotorStatus(void *sulf, SConnection * pCon) int status; pHdb node = NULL; hdbValue v; - float interrupt; + float interrupt = 0.; char error[132]; assert(sulf); @@ -527,7 +527,7 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData, SCSetInterrupt(pCon, eAbortBatch); self->pDrivInt->iErrorCount = 0; child = GetHipadabaNode(self->pDescriptor->parNode, "status"); - UpdateHipadabaPar(child, MakeHdbText("run"), pCon); + UpdateHipadabaPar(child, MakeHdbText("error"), pCon); return hdbAbort; } diff --git a/nread.c b/nread.c index 3cb546e3..33b98f5a 100644 --- a/nread.c +++ b/nread.c @@ -42,6 +42,8 @@ #include "commandlog.h" #include "uselect.h" #include "trace.h" +#include "protocol.h" + extern pServer pServ; extern int VerifyChannel(mkChannel * self); /* defined in network.c */ @@ -296,7 +298,7 @@ static int NetReadRead(pNetRead self, pNetItem pItem) if (strlen(pItem->pHold) > 0) { strlcat(pItem->pHold, pPtr, 511); /* DFC locking for protocol zero only */ - if (pItem->pCon->iProtocolID == 0 && + if (pItem->pCon->iProtocolID == PROTSICS && CostaLocked(pItem->pCon->pStack)) iStat = 0; else @@ -308,7 +310,7 @@ static int NetReadRead(pNetRead self, pNetItem pItem) } else { /* no, normal command */ /* DFC locking for protocol zero only */ - if (pItem->pCon->iProtocolID == 0 && + if (pItem->pCon->iProtocolID == PROTSICS && CostaLocked(pItem->pCon->pStack)) iStat = 0; else @@ -498,7 +500,7 @@ static int TelnetRead(pNetRead self, pNetItem pItem) case '\r': case '\n': /* DFC locking for protocol zero only */ - if (pItem->pCon->iProtocolID == 0 && + if (pItem->pCon->iProtocolID == PROTSICS && CostaLocked(pItem->pCon->pStack)) iStat = 0; else @@ -1076,7 +1078,7 @@ static int CommandDataCB(int handle, void *userData) if (pPtr[i] == '\r' || pPtr[i] == '\n') { self->state = SKIPTERM; if (!testAndInvokeInterrupt(self, handle)) { - if (self->pCon->iProtocolID == 0 && CostaLocked(self->pCon->pStack)) + if (self->pCon->iProtocolID == PROTSICS && CostaLocked(self->pCon->pStack)) status = 0; else status = CostaTop(self->pCon->pStack, GetCharArray(self->command)); @@ -1180,7 +1182,7 @@ static int ANETTelnetProcess(int handle, void *usData) case '\r': case '\n': if (!testAndInvokeInterrupt(self, handle)) { - if (self->pCon->iProtocolID == 0 && CostaLocked(self->pCon->pStack)) + if (self->pCon->iProtocolID == PROTSICS && CostaLocked(self->pCon->pStack)) status = 0; else status = CostaTop(self->pCon->pStack, GetCharArray(self->command)); diff --git a/protocol.c b/protocol.c index b35bfcd1..800698e9 100644 --- a/protocol.c +++ b/protocol.c @@ -32,6 +32,10 @@ typedef struct __Protocol { int isDefaultSet; char *pProList[PROLISTLEN]; /* list of valid protocols? */ } Protocol; +/*================================================================================================ + WARNING: These two char arrays may replicate things defined elsewhere. They may be out of + sync with the rest of SIS. Keep in mind..... + ==================================================================================================*/ char *pEventType[] = { "VALUECHANGE", /* 0 */ @@ -188,7 +192,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData, SCWrite(pCon, "ERROR: no more memory", eError); return 0; } + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID); + } status = InterpExecute(pSics, comCon, command); + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMEND %d", comCon->transID); + } if (command != buffer) free(command); SCDeleteConnection(comCon); @@ -271,29 +281,29 @@ static int ProtocolSet(SConnection * pCon, Protocol * pPro, char *pProName) return 0; break; - case 1: /* normal (connection start default) */ + case PROTNORM: /* normal (connection start default) */ SCSetWriteFunc(pMaster, SCNormalWrite); SCSetWriteFunc(pCon, SCNormalWrite); break; - case 2: /* outcodes */ + case PROTCODE: /* outcodes */ SCSetWriteFunc(pMaster, SCWriteWithOutcode); SCSetWriteFunc(pCon, SCWriteWithOutcode); break; - case 3: /* json */ + case PROTJSON: /* json */ SCSetWriteFunc(pCon, SCWriteJSON_String); SCSetWriteFunc(pMaster, SCWriteJSON_String); break; - case 4: /* ACT */ + case PROTACT: /* ACT */ SCSetWriteFunc(pMaster, SCACTWrite); SCSetWriteFunc(pCon, SCACTWrite); break; - case 5: + case PROTALL: SCSetWriteFunc(pMaster, SCAllWrite); SCSetWriteFunc(pCon, SCAllWrite); break; - case 0: /* default = psi_sics */ + case PROTSICS: /* default = psi_sics */ default: SCSetWriteFunc(pMaster, pPro->defaultWriter); SCSetWriteFunc(pCon, pPro->defaultWriter); @@ -332,11 +342,11 @@ int ProtocolGet(SConnection * pCon, void *pData, char *pProName, int len) /* check list of protocols for valid name */ switch (Index) { - case 0: /* default = psi_sics */ - case 1: /* normal (connection start default) */ - case 2: /* outcodes */ - case 3: /* json */ - case 4: /* act */ + case PROTSICS: /* default = psi_sics */ + case PROTNORM: /* normal (connection start default) */ + case PROTCODE: /* outcodes */ + case PROTJSON: /* json */ + case PROTACT: /* act */ pProName = pPro->pProList[Index]; return 1; break; @@ -441,7 +451,7 @@ static int InitDefaultProtocol(SConnection * pCon, Protocol * pPro) if (0 == pPro->isDefaultSet) { pPro->defaultWriter = SCGetWriteFunc(pCon); pPro->isDefaultSet = 1; - pCon->iProtocolID = 0; + pCon->iProtocolID = PROTSICS; } return pPro->isDefaultSet; } @@ -628,10 +638,11 @@ char *GetProtocolName(SConnection * pCon) /* check list of protocols for valid name */ switch (pCon->iProtocolID) { - case 0: /* default = psi_sics */ - case 1: /* normal (connection start default) */ - case 2: /* outcodes */ - case 3: /* json */ + case PROTSICS: /* default = psi_sics */ + case PROTNORM: /* normal (connection start default) */ + case PROTCODE: /* outcodes */ + case PROTJSON: /* json */ + case PROTACT: /* act */ return strdup(pPro->pProList[pCon->iProtocolID]); break; default: @@ -654,13 +665,13 @@ writeFunc GetProtocolWriteFunc(SConnection * pCon) { if (pCon != NULL) { switch (pCon->iProtocolID) { - case 2: /* outcodes */ + case PROTCODE: /* outcodes */ return SCWriteWithOutcode; break; - case 3: /* json */ + case PROTJSON: /* json */ return SCWriteJSON_String; break; - case 4: + case PROTACT: return SCACTWrite; break; default: diff --git a/protocol.h b/protocol.h index 4151b4f7..67316967 100644 --- a/protocol.h +++ b/protocol.h @@ -15,6 +15,14 @@ static char *pProTags[3] = { #define esStart -1 #define esFinish -2 +/*---------------------- protocol defines -------------------------------*/ +#define PROTSICS 0 +#define PROTNORM 1 +#define PROTCODE 2 +#define PROTJSON 3 +#define PROTACT 4 +#define PROTALL 5 + /*--------------------- lifecycle -------------------------------------- */ int InstallProtocol(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]); diff --git a/remoteobject.c b/remoteobject.c index 5cad0a91..0e78a3a4 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -2,7 +2,7 @@ * Remote objects in sicsobj. This means accessing remote objects in a different * SICS server from a master SICS server. * - * Reading is implementd according to this scheme: + * Reading is implemented according to this scheme: * * * When a read connection is made between a local node and a remote node in slave, then a * callback is installed on remote node in slave. @@ -17,7 +17,7 @@ * * COPRYRIGHT: see file COPYRIGHT * - * Mark Koennecke, February 2015 + * Mark Koennecke, February-May 2015 **/ #include #include @@ -31,22 +31,29 @@ #include #include #include +#include #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ +#define READACT 7654 +#define POCHACT 8437 + static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); + +static int transactionID = 100000; /*---------------------- our very private data structure -------------------*/ typedef struct { char *host; int port; - int readHandle; - int writeHandle; - int writeInUse; + int handle; + int transactHandle; int readList; + int writeList; unsigned int connected; time_t nextHeartbeat; + struct json_tokener *jtok; } RemoteOBJ, *pRemoteOBJ; /*----------------------------------------------------------------------------*/ typedef struct { @@ -59,6 +66,12 @@ typedef struct { char *remotePath; } UpdateCallback, *pUpdateCallback; /*----------------------------------------------------------------------------*/ +typedef struct { + int transID; + SConnection *pCon; + int waitTask; +}writeData, *pWriteData; +/*----------------------------------------------------------------------------*/ void KillRemoteOBJ(void *data) { char roTaskName[132]; @@ -68,66 +81,13 @@ void KillRemoteOBJ(void *data) snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); StopTask(pServ->pTasker,roTaskName); free(self->host); - ANETclose(self->readHandle); - ANETclose(self->writeHandle); + ANETclose(self->handle); + ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); + LLDdeleteBlob(self->writeList); + json_tokener_free(self->jtok); } } -/*========================= reading related code ================================*/ -static int RemoteReadCallback(int handle, void *userData) -{ - int length; - char *pPtr, *pStart, *pEnd; - - pPtr = ANETreadPtr(handle,&length); - - /* - * deal with command results - */ - pStart = strstr(pPtr, "TRANSACTIONSTART"); - pEnd = strstr(pPtr,"TRANSACTIONEND"); - if(pStart != NULL && pEnd != NULL){ - pStart = pStart + strlen("TRANSACTIONSTART"); - *pEnd = '\0'; - traceIO("RO","Received command - reply: %s", pStart); - pEnd += strlen("TRANSACTIONEND"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with update messages - */ - pStart = strstr(pPtr, "SROC:"); - pEnd = strstr(pPtr,":EROC\r\n"); - if(pStart != NULL && pEnd != NULL){ - pStart += strlen("SROC:"); - *pEnd = '\0'; - InterpExecute(pServ->pSics, pServ->dummyCon,pStart); - traceIO("RO", "Received %s from remote", pStart); - pEnd += strlen("EROC\r\n"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with heartbeats - */ - if((pStart = strstr(pPtr,"Poch")) != NULL){ - ANETreadConsume(handle,(pStart+4) - pPtr); - } - - /* - If there is more stuff to process: recurse - */ - pPtr = ANETreadPtr(handle,&length); - if(length > 0 && - ( strstr(pPtr,":EROC\r\n") != NULL || - strstr(pPtr,"TRANSACTIONEND") != NULL - || strstr(pPtr,"Poch") != NULL ) ) { - RemoteReadCallback(handle,userData); - } - - return 1; -} /*-----------------------------------------------------------------------------*/ static int transactCommand(int handle, char *command, char *reply, int replyLen) { @@ -135,7 +95,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) char *prefix = {"transact "}; int status, length, type; time_t start; - char *pPtr; + char *pPtr, *pEnd; /* * read possible dirt of the line @@ -144,12 +104,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) ANETreadConsume(handle,length); - toSend = malloc(strlen(command) + strlen(prefix) + 1); + toSend = malloc(strlen(command) + strlen(prefix) + 10); if(toSend == NULL){ return OOM; } strcpy(toSend, prefix); strcat(toSend, command); + if(strstr(command,"\n") == NULL){ + strcat(toSend,"\r\n"); + } status = ANETwrite(handle,toSend,strlen(toSend)); free(toSend); if(status != 1){ @@ -163,7 +126,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) while(time(NULL) < start + 2.0){ ANETprocess(); pPtr = ANETreadPtr(handle,&length); - if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ + if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){ + *pEnd = '\0'; strncpy(reply,pPtr,replyLen); ANETreadConsume(handle,length); return 1; @@ -191,9 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) return; } - self->readHandle = ANETconnect(self->host, self->port); - self->writeHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0){ + self->handle = ANETconnect(self->host, self->port); + self->transactHandle = ANETconnect(self->host, self->port); + if(self->handle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -206,23 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self) Default login with hard coded manager login. Defined in nserver.c */ - ANETwrite(self->readHandle,login,strlen(login)); - ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->handle,login,strlen(login)); + ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* eat the login responses */ - pPtr = ANETreadPtr(self->readHandle, &length); - ANETreadConsume(self->readHandle,length); - pPtr = ANETreadPtr(self->writeHandle, &length); - ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->handle, &length); + ANETreadConsume(self->handle,length); + pPtr = ANETreadPtr(self->transactHandle, &length); + ANETreadConsume(self->transactHandle,length); + transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1); - /* - * install the read callback - */ - ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL); /* * Remove geterror on read nodes and reinstall callbacks for reconnects @@ -233,17 +194,15 @@ static void ConnectRemoteObject(pRemoteOBJ self) node = FindHdbNode(NULL,rd.localNode,NULL); if(node != NULL){ SetHdbProperty(node,"geterror",NULL); - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); } status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)); - + self->connected = 1; - self->writeInUse = 0; } /*-----------------------------------------------------------------------------*/ static void MarkDisconnected(pRemoteOBJ self) @@ -270,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, pUpdateCallback uppi = (pUpdateCallback)userData; hdbDataMessage *mm = NULL; pDynString text; - char *prefix = {"SROC:hupdate "}; - char *postfix= {":EROC\r\n"}; + char *prefix = {"hupdate "}; + char *postfix= {" \r\n"}; char *txt = NULL; int length; pHdbPropertyChange propChange = NULL; @@ -309,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, if(!SCisConnected(uppi->sendCon)){ return hdbKill; } - length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + + length = strlen("hdelprop ") + strlen(uppi->remotePath) + strlen(propChange->key) + 10; if(propChange->value != NULL){ length += strlen(propChange->value); @@ -319,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, return hdbContinue; } if(propChange->value == NULL){ - snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, + snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath, propChange->key,postfix); } else { - snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, + snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath, propChange->key,propChange->value, postfix); } SCWrite(uppi->sendCon,txt,eValue); @@ -397,7 +356,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ /* * try a reconnect, @@ -407,7 +366,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) */ self->connected = 0; ConnectRemoteObject(self); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -448,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Install a callback on the remote node to update the master. The remote should * then immediatly send an update which will be processed by the read callback. */ - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); return 1; } @@ -492,10 +451,10 @@ static int HeartbeatTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; int status; - char command[] = {"Poch\r\n"}; + char command[] = {"contextdo 8437 Poch\r\n"}; if (time(NULL) > self->nextHeartbeat){ - status = ANETwrite(self->readHandle,command, strlen(command)); + status = ANETwrite(self->handle,command, strlen(command)); if(status != 1){ traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); self->connected = 0; @@ -509,11 +468,13 @@ static int HeartbeatTask(void *pData) return 1; } /*============================= writing related code =========================== - The logic here is to use the standard writeHandle when available. I expect most - communication to be short and to happen through the writeHandle. If that one is - in use, a new connection will be built. - --------------------------------------------------------------------------------- - suppress all superfluous OK from the slave + This works by sending the command via contextdo with a ID > 10^6. This causes + the remote SICS to send the termination messages. The transaction IDs together + with the connection responsible for it are kept in a list. + + This list is used by the write task to forward messages properly and for handling + termination. + -----------------------------------------------------------------------------------*/ #include static OutCode findOutCode(char *txt) @@ -527,93 +488,153 @@ static OutCode findOutCode(char *txt) } return eValue; } -/*--------------------------------------------------------------------------------*/ -static void printSICS(char *answer, SConnection *pCon) +/*-----------------------------------------------------------------------------------*/ +static void CheckWriteList(int writeList,int transID, OutCode eOut, char *pText) { - char line[1024], *pPtr, *pCode; - OutCode eCode; + int status; + writeData WD; - pPtr = answer; - while(pPtr != NULL){ - memset(line,0,sizeof(line)); - pPtr = stptok(pPtr,line,sizeof(line),"\n"); - if(strstr(line,"OK") == NULL){ - pCode = strstr(line,"@@"); - if(pCode != NULL){ - *pCode = '\0'; - pCode += 2; - eCode = findOutCode(trim(pCode)); - + status = LLDnodePtr2First(writeList); + while(status == 1){ + LLDblobData(writeList,&WD); + if(WD.transID == transID){ + if(strstr(pText,"COMSTART") != NULL){ + /* skip */ + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { + SCDeleteConnection(WD.pCon); + LLDblobDelete(writeList); + return; + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { + /* skip */ + return; + } else if(strstr(pText,"TASKSTART") != NULL){ + WD.waitTask = 1 ; + LLDblobDelete(writeList); + LLDblobAppend(writeList,&WD, sizeof(writeData)); + return; + } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ + SCDeleteConnection(WD.pCon); + LLDblobDelete(writeList); + return; } else { - eCode = eValue; + if(strstr(pText,"OK") == NULL){ + SCWrite(WD.pCon,pText,eOut); + } + return; } - SCWrite(pCon,line,eCode); } + status = LLDnodePtr2Next(writeList); } -} -/*---------------------------------------------------------------------------------*/ -static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) + } +/*-----------------------------------------------------------------------------------*/ +static int WriteResponseTask(void *pData) { - int handle, length; - char *answer = NULL; - char command[80]; + pRemoteOBJ self = (pRemoteOBJ)pData; + int status, length = 0, transID; + char *pText, *outTxt; + json_object *message = NULL, *data = NULL; + enum json_tokener_error tokerr; + OutCode eOut; + writeData WD; - if(self->writeInUse) { - handle = ANETconnect(self->host,self->port); - if(handle < 0){ - traceIO("RO","Failed to connect to %s at %d", self->host, self->port); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); - } - return handle; - } - ANETwrite(handle,login,strlen(login)); - usleep(500); - ANETprocess(); - /* - eat the login responses - */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); - *newHandle = 1; - - transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); - - } else { - self->writeInUse = 1; - handle = self->writeHandle; - /* - eat dirt from the line - */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); + if(!ANETvalidHandle(self->handle)) { + return 1; } - return handle; + + pText = ANETreadPtr(self->handle,&length); + while(length > 0){ + json_tokener_reset(self->jtok); + message = json_tokener_parse_ex(self->jtok,pText,length); + tokerr = self->jtok->err; + if(tokerr == json_tokener_continue){ + return 1; + } else if(tokerr != json_tokener_success) { + traceIO("RO","JSON parsing error %s on %s from %s %d", + json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); + ANETreadConsume(self->handle,length); + return 1; + } + if(json_object_get_type(message) != json_type_object) { + traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); + ANETreadConsume(self->handle,length); + return 1; + } + /* + we need to consume here what has been parsed. + The char_offset in the tokenizer structure might tell us that... + */ + ANETreadConsume(self->handle,self->jtok->char_offset); + + + /* + Received a valid message, process + */ + data = json_object_object_get(message,"trans"); + if(data == NULL){ + traceIO("RO","No transaction ID found in %s from %s", pText,self->host); + return 1; + } + transID = json_object_get_int(data); + + data = json_object_object_get(message,"flag"); + if(data == NULL){ + traceIO("RO","No flag found in %s from %s", pText,self->host); + return 1; + } + outTxt = (char *)json_object_get_string(data); + eOut = findOutCode(outTxt); + + data = json_object_object_get(message,"data"); + if(data == NULL){ + traceIO("RO","No data found in %s from %s", pText,self->host); + return 1; + } + pText = (char *)json_object_get_string(data); + + traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); + + /* + do nothing on Poch + */ + if(transID == POCHACT){ + pText = ANETreadPtr(self->handle,&length); + json_object_put(message); + continue; + } + + /* + process update messages + */ + if(transID == READACT){ + if(strstr(pText,"hupdate") != NULL || strstr(pText,"prop") != NULL){ + InterpExecute(pServ->pSics,pServ->dummyCon,pText); + } + traceIO("RO","Received %s from remote",pText); + pText = ANETreadPtr(self->handle,&length); + json_object_put(message); + continue; + } + + /* + check write List + */ + CheckWriteList(self->writeList,transID,eOut,pText); + json_object_put(message); + + pText = ANETreadPtr(self->handle,&length); + } + + return 1; } + /*---------------------------------------------------------------------------------*/ -static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) +static int IncrementTransactionID() { - char *answer = NULL, *pEnd, *command = NULL; - int length; - - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host); - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - printSICS(answer,pCon); - } - traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); - ANETreadConsume(handle,length); - break; - } + transactionID++; + if(transactionID >= 200000){ + transactionID = 100000; } - + return transactionID; } /*---------------------------------------------------------------------------------*/ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, @@ -626,14 +647,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, pDynString data; char *remoteNode; char *command, *answer, *pEnd; - + writeData WD; if((mm = GetHdbSetMessage(mes)) != NULL){ pCon = (SConnection *)mm->callData; - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return hdbAbort; - } /* build the command to send @@ -648,53 +665,32 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); + WD.pCon = SCCopyConnection(pCon); + WD.waitTask = 0; + WD.transID = IncrementTransactionID(); + snprintf(command,length,"contextdo %d hset %s %s\r\n", + WD.transID, remoteNode, GetCharArray(data)); /* write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); - status = ANETwrite(handle,command,strlen(command)); + LLDblobAppend(self->writeList,&WD,sizeof(writeData)); + status = ANETwrite(self->handle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host); - } - return hdbAbort; - } - - /* - wait for a response: TRANSACTIONFINISHED - */ - ProcessWriteResponse(self,handle,pCon); - - /* - Is there a termination script? - */ - command = GetHdbProp(currentNode,"termscript"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - Tcl_Eval(InterpGetTcl(pServ->pSics),command); - answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics)); - if(strstr(answer,"idle") != NULL){ - answer = ANETreadPtr(handle,&length); - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } + self->connected = 0; + ConnectRemoteObject(self); + if(self->connected == 0){ + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", + remoteNode, self->host); + } + return hdbAbort; } } - - - if(newHandle){ - ANETclose(handle); - } else { - self->writeInUse = 0; - } - + return hdbContinue; } return hdbContinue; @@ -722,12 +718,17 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) SetHdbProperty(localNode,"remotewrite",rd.remoteNode); AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback, self,NULL)); + /* + TODO: The connected write nodes should be held in a list in order to be able to + remove the write callbacks when deleting the remote object. As removing remote + objects usually only happens when SICS shuts down this is not so important. + */ /* * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -784,61 +785,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon, /*============================ remote execute =================================*/ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) { - int status, handle, newHandle = 0, length; - char *answer, *pEnd; - - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return 0; - } + int status; + char answer[65536]; /* write, thereby taking care to prefix with transact and for proper termination */ - if(strstr(command,"transact") == NULL){ - ANETwrite(handle,"transact ", sizeof("transact ")); - } - status = ANETwrite(handle,command,strlen(command)); - if(strstr(command,"\n") == NULL){ - ANETwrite(handle,"\r\n",2); - } - if(status < 0){ - traceIO("RO","Disconnect from %s while executing %s", self->host, command); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - return 0; - } - - /* - wait for response - */ - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - SCPrintf(pCon,eValue,answer); - } - ANETreadConsume(handle,length); - break; - } - } - - if(newHandle){ - ANETclose(handle); + memset(answer,0,sizeof(answer)-1); + status = transactCommand(self->transactHandle,command,answer,sizeof(answer)); + if(status == 1){ + SCWrite(pCon,answer,eValue); } else { - self->writeInUse = 0; + SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); } - return 1; + return status; } /*------------------------------------------------------------------------------*/ static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, @@ -920,6 +881,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, self->host = strdup(argv[2]); self->port = atoi(argv[3]); self->readList = LLDblobCreate(); + self->writeList = LLDblobCreate(); + self->jtok = json_tokener_new(); ConnectRemoteObject(self); cmd = AddSICSHdbPar(pNew->objectNode, @@ -949,6 +912,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); + snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port); + TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status; } diff --git a/scriptcontext.c b/scriptcontext.c index e1152fc3..b63e565b 100644 --- a/scriptcontext.c +++ b/scriptcontext.c @@ -553,7 +553,7 @@ static char *SctActionHandler(void *actionData, char *lastReply, } else { l = strlen(origScript); } - snprintf(eprop, sizeof eprop, "error_in_%.*s", l, origScript); + snprintf(eprop, sizeof eprop, "error_in_%s", origScript); emsg = GetHdbProp(node, eprop); cnt = 0; if (emsg != NULL) { diff --git a/sicsobj.c b/sicsobj.c index 330e5933..8f08d396 100644 --- a/sicsobj.c +++ b/sicsobj.c @@ -477,7 +477,6 @@ int InvokeSICSOBJ(SConnection * pCon, SicsInterp * pSics, void *pData, status = GetHdbProperty(parNode,"geterror",buffer,sizeof(buffer)); if (status == 1 && strstr(buffer,"none") == NULL){ SCPrintf(pCon,eValue,"ERROR: %s on last read of %s", buffer, argv[0]); - SCPrintf(pCon,eValue,"%s = -99999", argv[0]); return 0; } status = GetHipadabaPar(parNode, &data, pCon); diff --git a/stptok.h b/stptok.h index e4533d7f..6e03e451 100644 --- a/stptok.h +++ b/stptok.h @@ -1,14 +1,14 @@ -/* -** stptok() -- public domain by Ray Gardner, modified by Bob Stout -** -** You pass this function a string to parse, a buffer to receive the -** "token" that gets scanned, the length of the buffer, and a string of -** "break" characters that stop the scan. It will copy the string into -** the buffer up to any of the break characters, or until the buffer is -** full, and will always leave the buffer null-terminated. It will -** return a pointer to the first non-breaking character after the one -** that stopped the scan. -*/ +/* +** stptok() -- public domain by Ray Gardner, modified by Bob Stout +** +** You pass this function a string to parse, a buffer to receive the +** "token" that gets scanned, the length of the buffer, and a string of +** "break" characters that stop the scan. It will copy the string into +** the buffer up to any of the break characters, or until the buffer is +** full, and will always leave the buffer null-terminated. It will +** return a pointer to the first non-breaking character after the one +** that stopped the scan. +*/ #ifndef STPSTPTOK #define STPSTPTOK char *stptok(const char *s, char *tok, size_t toklen, char *brk);