diff --git a/conman.c b/conman.c index f9ca2242..e2b993ac 100644 --- a/conman.c +++ b/conman.c @@ -1744,7 +1744,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) memset(pBueffel, 0, 80); stptok(trim(pCommand), pBueffel, 79, " "); self->iCmdCtr++; - if (999999 < self->iCmdCtr) { + if (self->iCmdCtr > 99998) { self->iCmdCtr = 0; } self->transID = self->iCmdCtr; diff --git a/interface.c b/interface.c index f3b9790f..6f84a604 100644 --- a/interface.c +++ b/interface.c @@ -241,10 +241,10 @@ static int DriveTaskFunc(void *data) } else { ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } - if(taskData->pCon->remote){ - SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); - } traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); + } return 0; } /*--------------------------------------------------------------------------*/ @@ -275,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget) ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); InvokeNewTarget(pServ->pExecutor,name,fTarget); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = DRIVEID; taskData->obj = obj; @@ -396,8 +399,8 @@ static int CountTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("count","CountTask %s finished with state %d", taskData->name,status); - if(taskData->pCon->remote){ - SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); } return 0; } @@ -424,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name) } ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = COUNTID; taskData->obj = obj; diff --git a/protocol.c b/protocol.c index 9891913d..7908bec0 100644 --- a/protocol.c +++ b/protocol.c @@ -188,7 +188,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData, SCWrite(pCon, "ERROR: no more memory", eError); return 0; } + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID); + } status = InterpExecute(pSics, comCon, command); + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMEND %d", comCon->transID); + } if (command != buffer) free(command); SCDeleteConnection(comCon); diff --git a/remoteobject.c b/remoteobject.c index c1912769..b6ec41ee 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -2,7 +2,7 @@ * Remote objects in sicsobj. This means accessing remote objects in a different * SICS server from a master SICS server. * - * Reading is implementd according to this scheme: + * Reading is implemented according to this scheme: * * * When a read connection is made between a local node and a remote node in slave, then a * callback is installed on remote node in slave. @@ -31,22 +31,27 @@ #include #include #include +#include #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); + +static int transactionID = 100000; /*---------------------- our very private data structure -------------------*/ typedef struct { char *host; int port; int readHandle; int writeHandle; - int writeInUse; + int transactHandle; int readList; + int writeList; unsigned int connected; time_t nextHeartbeat; + struct json_tokener *jtok; } RemoteOBJ, *pRemoteOBJ; /*----------------------------------------------------------------------------*/ typedef struct { @@ -59,6 +64,12 @@ typedef struct { char *remotePath; } UpdateCallback, *pUpdateCallback; /*----------------------------------------------------------------------------*/ +typedef struct { + int transID; + SConnection *pCon; + int waitTask; +}writeData, *pWriteData; +/*----------------------------------------------------------------------------*/ void KillRemoteOBJ(void *data) { char roTaskName[132]; @@ -70,7 +81,10 @@ void KillRemoteOBJ(void *data) free(self->host); ANETclose(self->readHandle); ANETclose(self->writeHandle); + ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); + LLDdeleteBlob(self->writeList); + json_tokener_free(self->jtok); } } /*========================= reading related code ================================*/ @@ -135,7 +149,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) char *prefix = {"transact "}; int status, length, type; time_t start; - char *pPtr; + char *pPtr, *pEnd; /* * read possible dirt of the line @@ -144,12 +158,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) ANETreadConsume(handle,length); - toSend = malloc(strlen(command) + strlen(prefix) + 1); + toSend = malloc(strlen(command) + strlen(prefix) + 10); if(toSend == NULL){ return OOM; } strcpy(toSend, prefix); strcat(toSend, command); + if(strstr(command,"\n") == NULL){ + strcat(toSend,"\r\n"); + } status = ANETwrite(handle,toSend,strlen(toSend)); free(toSend); if(status != 1){ @@ -163,7 +180,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) while(time(NULL) < start + 2.0){ ANETprocess(); pPtr = ANETreadPtr(handle,&length); - if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ + if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){ + *pEnd = '\0'; strncpy(reply,pPtr,replyLen); ANETreadConsume(handle,length); return 1; @@ -193,7 +211,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) self->readHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0){ + self->transactHandle = ANETconnect(self->host, self->port); + if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -208,6 +227,7 @@ static void ConnectRemoteObject(pRemoteOBJ self) */ ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* @@ -217,6 +237,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) ANETreadConsume(self->readHandle,length); pPtr = ANETreadPtr(self->writeHandle, &length); ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->transactHandle, &length); + ANETreadConsume(self->transactHandle,length); /* @@ -240,11 +262,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)-1); - transactCommand(self->writeHandle,"config remote\r\n",command,sizeof(command)-1); + transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1); self->connected = 1; - self->writeInUse = 0; } /*-----------------------------------------------------------------------------*/ static void MarkDisconnected(pRemoteOBJ self) @@ -398,7 +418,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ /* * try a reconnect, @@ -408,7 +428,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) */ self->connected = 0; ConnectRemoteObject(self); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -510,11 +530,13 @@ static int HeartbeatTask(void *pData) return 1; } /*============================= writing related code =========================== - The logic here is to use the standard writeHandle when available. I expect most - communication to be short and to happen through the writeHandle. If that one is - in use, a new connection will be built. - --------------------------------------------------------------------------------- - suppress all superfluous OK from the slave + This works by sending the command via contextdo with a ID > 10^6. This causes + the remote SICS to send the termination messages. The transaction IDs together + with the connection responsible for it are kept in a list. + + This list is used by the write task to forward messages properly and for handling + termination. + -----------------------------------------------------------------------------------*/ #include static OutCode findOutCode(char *txt) @@ -528,94 +550,111 @@ static OutCode findOutCode(char *txt) } return eValue; } -/*--------------------------------------------------------------------------------*/ -static void printSICS(char *answer, SConnection *pCon) +/*-----------------------------------------------------------------------------------*/ +static int WriteResponseTask(void *pData) { - char line[1024], *pPtr, *pCode; - OutCode eCode; + pRemoteOBJ self = (pRemoteOBJ)pData; + int status, length = 0, transID; + char *pText, *outTxt; + json_object *message = NULL, *data = NULL; + enum json_tokener_error tokerr; + OutCode eOut; + writeData WD; - pPtr = answer; - while(pPtr != NULL){ - memset(line,0,sizeof(line)); - pPtr = stptok(pPtr,line,sizeof(line),"\n"); - if(strstr(line,"OK") == NULL && strstr(line,"TASKFINISHED") == NULL){ - pCode = strstr(line,"@@"); - if(pCode != NULL){ - *pCode = '\0'; - pCode += 2; - eCode = findOutCode(trim(pCode)); - - } else { - eCode = eValue; - } - SCWrite(pCon,line,eCode); - } + if(!ANETvalidHandle(self->writeHandle)) { + return 1; } -} -/*---------------------------------------------------------------------------------*/ -static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) -{ - int handle, length; - char *answer = NULL; - char command[80]; - if(self->writeInUse) { - handle = ANETconnect(self->host,self->port); - if(handle < 0){ - traceIO("RO","Failed to connect to %s at %d", self->host, self->port); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); - } - return handle; + pText = ANETreadPtr(self->writeHandle,&length); + if(length > 0){ + json_tokener_reset(self->jtok); + message = json_tokener_parse_ex(self->jtok,pText,length); + tokerr = self->jtok->err; + if(tokerr == json_tokener_continue){ + return 1; + } else if(tokerr != json_tokener_success) { + traceIO("RO","JSON parsing error %s on %s from %s", + json_tokener_errors[tokerr], pText, self->host); + ANETreadConsume(self->writeHandle,length); + return 1; + } + if(json_object_get_type(message) != json_type_object) { + traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); + ANETreadConsume(self->writeHandle,length); + return 1; } - ANETwrite(handle,login,strlen(login)); - usleep(500); - ANETprocess(); /* - eat the login responses + we need to consume here what has been parsed. + The char_offset in the tokenizer structure might tell us that... */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); - *newHandle = 1; + ANETreadConsume(self->writeHandle,self->jtok->char_offset); - transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); - transactCommand(handle,"config remote\r\n",command,sizeof(command)-1); - } else { - self->writeInUse = 1; - handle = self->writeHandle; /* - eat dirt from the line + Received a valid message, process */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); + data = json_object_object_get(message,"trans"); + if(data == NULL){ + traceIO("RO","No transaction ID found in %s from %s", pText,self->host); + return 1; + } + transID = json_object_get_int(data); + + data = json_object_object_get(message,"flag"); + if(data == NULL){ + traceIO("RO","No flag found in %s from %s", pText,self->host); + return 1; + } + outTxt = (char *)json_object_get_string(data); + eOut = findOutCode(outTxt); + + data = json_object_object_get(message,"data"); + if(data == NULL){ + traceIO("RO","No data found in %s from %s", pText,self->host); + return 1; + } + pText = (char *)json_object_get_string(data); + + status = LLDnodePtr2First(self->writeList); + while(status == 1){ + LLDblobData(self->writeList,&WD); + if(WD.transID == transID){ + if(strstr(pText,"COMSTART") != NULL){ + /* skip */ + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { + SCDeleteConnection(WD.pCon); + LLDblobDelete(self->writeList); + return 1; + } else if(strstr(pText,"TASKSTART") != NULL){ + WD.waitTask = 1 ; + LLDblobDelete(self->writeList); + LLDblobAppend(self->writeList,&WD, sizeof(writeData)); + return 1; + } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ + SCDeleteConnection(WD.pCon); + LLDblobDelete(self->writeList); + return 1; + } else { + SCWrite(WD.pCon,pText,eOut); + return 1; + } + } + status = LLDnodePtr2Next(self->writeList); + } + } - return handle; + + return 1; } + /*---------------------------------------------------------------------------------*/ -static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) +static int IncrementTransactionID() { - char *answer = NULL, *pEnd, *command = NULL; - int length; - - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host); - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - printSICS(answer,pCon); - } - traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); - ANETreadConsume(handle,pEnd+strlen("TRANSACTIONFINISHED") - answer); - break; - } + transactionID++; + if(transactionID >= 200000){ + transactionID = 100000; } - + return transactionID; } /*---------------------------------------------------------------------------------*/ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, @@ -628,14 +667,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, pDynString data; char *remoteNode; char *command, *answer, *pEnd; - + writeData WD; if((mm = GetHdbSetMessage(mes)) != NULL){ pCon = (SConnection *)mm->callData; - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return hdbAbort; - } /* build the command to send @@ -650,13 +685,17 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); + WD.pCon = SCCopyConnection(pCon); + WD.waitTask = 0; + WD.transID = IncrementTransactionID(); + snprintf(command,length,"contextdo %d hset %s %s\r\n", + WD.transID, remoteNode, GetCharArray(data)); /* write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); - status = ANETwrite(handle,command,strlen(command)); + status = ANETwrite(self->writeHandle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ @@ -665,55 +704,8 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - - /* - wait for a response: TRANSACTIONFINISHED - */ - ProcessWriteResponse(self,handle,pCon); - - /* - Is there a termination script? - */ - command = GetHdbProp(currentNode,"termscript"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - Tcl_Eval(InterpGetTcl(pServ->pSics),command); - answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics)); - if(strstr(answer,"idle") != NULL){ - answer = ANETreadPtr(handle,&length); - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } - } - } - - /* - Do I have to wait for a TASKFINISHED? - */ - command = GetHdbProp(currentNode,"taskwait"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - answer = ANETreadPtr(handle,&length); - if(length > 0 && strstr(answer,"TASKFINISHED") != NULL){ - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } - } - } - - - if(newHandle){ - ANETclose(handle); - } else { - self->writeInUse = 0; - } - + LLDblobAppend(self->writeList,&WD,sizeof(writeData)); + return hdbContinue; } return hdbContinue; @@ -746,7 +738,7 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -803,61 +795,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon, /*============================ remote execute =================================*/ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) { - int status, handle, newHandle = 0, length; - char *answer, *pEnd; - - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return 0; - } + int status; + char answer[65536]; /* write, thereby taking care to prefix with transact and for proper termination */ - if(strstr(command,"transact") == NULL){ - ANETwrite(handle,"transact ", sizeof("transact ")); - } - status = ANETwrite(handle,command,strlen(command)); - if(strstr(command,"\n") == NULL){ - ANETwrite(handle,"\r\n",2); - } - if(status < 0){ - traceIO("RO","Disconnect from %s while executing %s", self->host, command); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - return 0; - } - - /* - wait for response - */ - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - SCPrintf(pCon,eValue,answer); - } - ANETreadConsume(handle,length); - break; - } - } - - if(newHandle){ - ANETclose(handle); + memset(answer,0,sizeof(answer)-1); + status = transactCommand(self->transactHandle,command,answer,sizeof(answer)); + if(status){ + SCWrite(pCon,answer,eValue); } else { - self->writeInUse = 0; + SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); } - return 1; + return status; } /*------------------------------------------------------------------------------*/ static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, @@ -939,6 +891,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, self->host = strdup(argv[2]); self->port = atoi(argv[3]); self->readList = LLDblobCreate(); + self->writeList = LLDblobCreate(); + self->jtok = json_tokener_new(); ConnectRemoteObject(self); cmd = AddSICSHdbPar(pNew->objectNode, @@ -968,6 +922,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); + snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); + TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status; }