From 0d0c90cee7da6aff92f34183a56f66c704b54936 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Thu, 19 Mar 2015 16:12:35 +0100 Subject: [PATCH 1/5] Another try ad getting write to work --- conman.c | 9 ++++++++- conman.h | 1 + interface.c | 6 ++++++ remoteobject.c | 27 +++++++++++++++++++++++---- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/conman.c b/conman.c index 88b2ad1f..f9ca2242 100644 --- a/conman.c +++ b/conman.c @@ -229,6 +229,7 @@ static SConnection *CreateConnection(SicsInterp * pSics) pRes->conStart = time(NULL); pRes->write = SCNormalWrite; pRes->runLevel = RUNDRIVE; + pRes->remote = 0; /* initialise context variables */ pRes->iCmdCtr = 0; @@ -461,6 +462,7 @@ SConnection *SCCopyConnection(SConnection * pCon) result->iList = -1; result->runLevel = pCon->runLevel; result->data = pCon->data; + result->remote = pCon->remote; return result; } @@ -1768,6 +1770,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) config File Filename Logs to another file config output normal | withcode | ACT Sets output mode config listen 0 | 1 enables commandlog listen mode + config remote sets the remote connection flag ---------------------------------------------------------------------------*/ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, @@ -1825,7 +1828,11 @@ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, SCSendOK(pCon); return 1; } - } + } else if(strcmp(argv[1],"remote") == 0) { + pMaster->remote = 1; + pCon->remote = 1; + return 1; + } /* check no or args */ if (argc < 3) { diff --git a/conman.h b/conman.h index 91befe22..99a9dd16 100644 --- a/conman.h +++ b/conman.h @@ -71,6 +71,7 @@ typedef struct __SConnection { pCosta pStack; /* stack of pending commands */ int contextStack; /* context stack: may go? */ mkChannel *pSock; /* for temporary backwards compatability */ + int remote; /* true if this is a remote object connection */ } SConnection; #include "nserver.h" diff --git a/interface.c b/interface.c index d661b2c3..f3b9790f 100644 --- a/interface.c +++ b/interface.c @@ -241,6 +241,9 @@ static int DriveTaskFunc(void *data) } else { ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } + if(taskData->pCon->remote){ + SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + } traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); return 0; } @@ -393,6 +396,9 @@ static int CountTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("count","CountTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->remote){ + SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + } return 0; } /*--------------------------------------------------------------------------*/ diff --git a/remoteobject.c b/remoteobject.c index 5cad0a91..c1912769 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -240,8 +240,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)); - + transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)-1); + transactCommand(self->writeHandle,"config remote\r\n",command,sizeof(command)-1); + self->connected = 1; self->writeInUse = 0; } @@ -537,7 +538,7 @@ static void printSICS(char *answer, SConnection *pCon) while(pPtr != NULL){ memset(line,0,sizeof(line)); pPtr = stptok(pPtr,line,sizeof(line),"\n"); - if(strstr(line,"OK") == NULL){ + if(strstr(line,"OK") == NULL && strstr(line,"TASKFINISHED") == NULL){ pCode = strstr(line,"@@"); if(pCode != NULL){ *pCode = '\0'; @@ -578,6 +579,7 @@ static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle *newHandle = 1; transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); + transactCommand(handle,"config remote\r\n",command,sizeof(command)-1); } else { self->writeInUse = 1; @@ -609,7 +611,7 @@ static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) printSICS(answer,pCon); } traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); - ANETreadConsume(handle,length); + ANETreadConsume(handle,pEnd+strlen("TRANSACTIONFINISHED") - answer); break; } } @@ -688,6 +690,23 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } } + /* + Do I have to wait for a TASKFINISHED? + */ + command = GetHdbProp(currentNode,"taskwait"); + if(command != NULL){ + while(1) { + TaskYield(pServ->pTasker); + answer = ANETreadPtr(handle,&length); + if(length > 0 && strstr(answer,"TASKFINISHED") != NULL){ + printSICS(answer,pCon); + traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); + ANETreadConsume(handle,length); + break; + } + } + } + if(newHandle){ ANETclose(handle); From b7eaa538ed90cbad11d6c0b7b037600594a18b73 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Thu, 23 Apr 2015 15:52:05 +0200 Subject: [PATCH 2/5] - Rewrote the write part of remoteobject - With transactionID > 10^6, now termination messages are sent by contextdo and tasks - This still has bugs --- conman.c | 2 +- interface.c | 16 ++- protocol.c | 6 + remoteobject.c | 342 +++++++++++++++++++++---------------------------- 4 files changed, 167 insertions(+), 199 deletions(-) diff --git a/conman.c b/conman.c index f9ca2242..e2b993ac 100644 --- a/conman.c +++ b/conman.c @@ -1744,7 +1744,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) memset(pBueffel, 0, 80); stptok(trim(pCommand), pBueffel, 79, " "); self->iCmdCtr++; - if (999999 < self->iCmdCtr) { + if (self->iCmdCtr > 99998) { self->iCmdCtr = 0; } self->transID = self->iCmdCtr; diff --git a/interface.c b/interface.c index f3b9790f..6f84a604 100644 --- a/interface.c +++ b/interface.c @@ -241,10 +241,10 @@ static int DriveTaskFunc(void *data) } else { ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } - if(taskData->pCon->remote){ - SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); - } traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); + } return 0; } /*--------------------------------------------------------------------------*/ @@ -275,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget) ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); InvokeNewTarget(pServ->pExecutor,name,fTarget); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = DRIVEID; taskData->obj = obj; @@ -396,8 +399,8 @@ static int CountTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("count","CountTask %s finished with state %d", taskData->name,status); - if(taskData->pCon->remote){ - SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); } return 0; } @@ -424,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name) } ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = COUNTID; taskData->obj = obj; diff --git a/protocol.c b/protocol.c index 9891913d..7908bec0 100644 --- a/protocol.c +++ b/protocol.c @@ -188,7 +188,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData, SCWrite(pCon, "ERROR: no more memory", eError); return 0; } + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID); + } status = InterpExecute(pSics, comCon, command); + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMEND %d", comCon->transID); + } if (command != buffer) free(command); SCDeleteConnection(comCon); diff --git a/remoteobject.c b/remoteobject.c index c1912769..b6ec41ee 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -2,7 +2,7 @@ * Remote objects in sicsobj. This means accessing remote objects in a different * SICS server from a master SICS server. * - * Reading is implementd according to this scheme: + * Reading is implemented according to this scheme: * * * When a read connection is made between a local node and a remote node in slave, then a * callback is installed on remote node in slave. @@ -31,22 +31,27 @@ #include #include #include +#include #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); + +static int transactionID = 100000; /*---------------------- our very private data structure -------------------*/ typedef struct { char *host; int port; int readHandle; int writeHandle; - int writeInUse; + int transactHandle; int readList; + int writeList; unsigned int connected; time_t nextHeartbeat; + struct json_tokener *jtok; } RemoteOBJ, *pRemoteOBJ; /*----------------------------------------------------------------------------*/ typedef struct { @@ -59,6 +64,12 @@ typedef struct { char *remotePath; } UpdateCallback, *pUpdateCallback; /*----------------------------------------------------------------------------*/ +typedef struct { + int transID; + SConnection *pCon; + int waitTask; +}writeData, *pWriteData; +/*----------------------------------------------------------------------------*/ void KillRemoteOBJ(void *data) { char roTaskName[132]; @@ -70,7 +81,10 @@ void KillRemoteOBJ(void *data) free(self->host); ANETclose(self->readHandle); ANETclose(self->writeHandle); + ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); + LLDdeleteBlob(self->writeList); + json_tokener_free(self->jtok); } } /*========================= reading related code ================================*/ @@ -135,7 +149,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) char *prefix = {"transact "}; int status, length, type; time_t start; - char *pPtr; + char *pPtr, *pEnd; /* * read possible dirt of the line @@ -144,12 +158,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) ANETreadConsume(handle,length); - toSend = malloc(strlen(command) + strlen(prefix) + 1); + toSend = malloc(strlen(command) + strlen(prefix) + 10); if(toSend == NULL){ return OOM; } strcpy(toSend, prefix); strcat(toSend, command); + if(strstr(command,"\n") == NULL){ + strcat(toSend,"\r\n"); + } status = ANETwrite(handle,toSend,strlen(toSend)); free(toSend); if(status != 1){ @@ -163,7 +180,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) while(time(NULL) < start + 2.0){ ANETprocess(); pPtr = ANETreadPtr(handle,&length); - if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ + if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){ + *pEnd = '\0'; strncpy(reply,pPtr,replyLen); ANETreadConsume(handle,length); return 1; @@ -193,7 +211,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) self->readHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0){ + self->transactHandle = ANETconnect(self->host, self->port); + if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -208,6 +227,7 @@ static void ConnectRemoteObject(pRemoteOBJ self) */ ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* @@ -217,6 +237,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) ANETreadConsume(self->readHandle,length); pPtr = ANETreadPtr(self->writeHandle, &length); ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->transactHandle, &length); + ANETreadConsume(self->transactHandle,length); /* @@ -240,11 +262,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)-1); - transactCommand(self->writeHandle,"config remote\r\n",command,sizeof(command)-1); + transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1); self->connected = 1; - self->writeInUse = 0; } /*-----------------------------------------------------------------------------*/ static void MarkDisconnected(pRemoteOBJ self) @@ -398,7 +418,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ /* * try a reconnect, @@ -408,7 +428,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) */ self->connected = 0; ConnectRemoteObject(self); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -510,11 +530,13 @@ static int HeartbeatTask(void *pData) return 1; } /*============================= writing related code =========================== - The logic here is to use the standard writeHandle when available. I expect most - communication to be short and to happen through the writeHandle. If that one is - in use, a new connection will be built. - --------------------------------------------------------------------------------- - suppress all superfluous OK from the slave + This works by sending the command via contextdo with a ID > 10^6. This causes + the remote SICS to send the termination messages. The transaction IDs together + with the connection responsible for it are kept in a list. + + This list is used by the write task to forward messages properly and for handling + termination. + -----------------------------------------------------------------------------------*/ #include static OutCode findOutCode(char *txt) @@ -528,94 +550,111 @@ static OutCode findOutCode(char *txt) } return eValue; } -/*--------------------------------------------------------------------------------*/ -static void printSICS(char *answer, SConnection *pCon) +/*-----------------------------------------------------------------------------------*/ +static int WriteResponseTask(void *pData) { - char line[1024], *pPtr, *pCode; - OutCode eCode; + pRemoteOBJ self = (pRemoteOBJ)pData; + int status, length = 0, transID; + char *pText, *outTxt; + json_object *message = NULL, *data = NULL; + enum json_tokener_error tokerr; + OutCode eOut; + writeData WD; - pPtr = answer; - while(pPtr != NULL){ - memset(line,0,sizeof(line)); - pPtr = stptok(pPtr,line,sizeof(line),"\n"); - if(strstr(line,"OK") == NULL && strstr(line,"TASKFINISHED") == NULL){ - pCode = strstr(line,"@@"); - if(pCode != NULL){ - *pCode = '\0'; - pCode += 2; - eCode = findOutCode(trim(pCode)); - - } else { - eCode = eValue; - } - SCWrite(pCon,line,eCode); - } + if(!ANETvalidHandle(self->writeHandle)) { + return 1; } -} -/*---------------------------------------------------------------------------------*/ -static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) -{ - int handle, length; - char *answer = NULL; - char command[80]; - if(self->writeInUse) { - handle = ANETconnect(self->host,self->port); - if(handle < 0){ - traceIO("RO","Failed to connect to %s at %d", self->host, self->port); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); - } - return handle; + pText = ANETreadPtr(self->writeHandle,&length); + if(length > 0){ + json_tokener_reset(self->jtok); + message = json_tokener_parse_ex(self->jtok,pText,length); + tokerr = self->jtok->err; + if(tokerr == json_tokener_continue){ + return 1; + } else if(tokerr != json_tokener_success) { + traceIO("RO","JSON parsing error %s on %s from %s", + json_tokener_errors[tokerr], pText, self->host); + ANETreadConsume(self->writeHandle,length); + return 1; + } + if(json_object_get_type(message) != json_type_object) { + traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); + ANETreadConsume(self->writeHandle,length); + return 1; } - ANETwrite(handle,login,strlen(login)); - usleep(500); - ANETprocess(); /* - eat the login responses + we need to consume here what has been parsed. + The char_offset in the tokenizer structure might tell us that... */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); - *newHandle = 1; + ANETreadConsume(self->writeHandle,self->jtok->char_offset); - transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); - transactCommand(handle,"config remote\r\n",command,sizeof(command)-1); - } else { - self->writeInUse = 1; - handle = self->writeHandle; /* - eat dirt from the line + Received a valid message, process */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); + data = json_object_object_get(message,"trans"); + if(data == NULL){ + traceIO("RO","No transaction ID found in %s from %s", pText,self->host); + return 1; + } + transID = json_object_get_int(data); + + data = json_object_object_get(message,"flag"); + if(data == NULL){ + traceIO("RO","No flag found in %s from %s", pText,self->host); + return 1; + } + outTxt = (char *)json_object_get_string(data); + eOut = findOutCode(outTxt); + + data = json_object_object_get(message,"data"); + if(data == NULL){ + traceIO("RO","No data found in %s from %s", pText,self->host); + return 1; + } + pText = (char *)json_object_get_string(data); + + status = LLDnodePtr2First(self->writeList); + while(status == 1){ + LLDblobData(self->writeList,&WD); + if(WD.transID == transID){ + if(strstr(pText,"COMSTART") != NULL){ + /* skip */ + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { + SCDeleteConnection(WD.pCon); + LLDblobDelete(self->writeList); + return 1; + } else if(strstr(pText,"TASKSTART") != NULL){ + WD.waitTask = 1 ; + LLDblobDelete(self->writeList); + LLDblobAppend(self->writeList,&WD, sizeof(writeData)); + return 1; + } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ + SCDeleteConnection(WD.pCon); + LLDblobDelete(self->writeList); + return 1; + } else { + SCWrite(WD.pCon,pText,eOut); + return 1; + } + } + status = LLDnodePtr2Next(self->writeList); + } + } - return handle; + + return 1; } + /*---------------------------------------------------------------------------------*/ -static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) +static int IncrementTransactionID() { - char *answer = NULL, *pEnd, *command = NULL; - int length; - - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host); - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - printSICS(answer,pCon); - } - traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); - ANETreadConsume(handle,pEnd+strlen("TRANSACTIONFINISHED") - answer); - break; - } + transactionID++; + if(transactionID >= 200000){ + transactionID = 100000; } - + return transactionID; } /*---------------------------------------------------------------------------------*/ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, @@ -628,14 +667,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, pDynString data; char *remoteNode; char *command, *answer, *pEnd; - + writeData WD; if((mm = GetHdbSetMessage(mes)) != NULL){ pCon = (SConnection *)mm->callData; - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return hdbAbort; - } /* build the command to send @@ -650,13 +685,17 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); + WD.pCon = SCCopyConnection(pCon); + WD.waitTask = 0; + WD.transID = IncrementTransactionID(); + snprintf(command,length,"contextdo %d hset %s %s\r\n", + WD.transID, remoteNode, GetCharArray(data)); /* write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); - status = ANETwrite(handle,command,strlen(command)); + status = ANETwrite(self->writeHandle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ @@ -665,55 +704,8 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - - /* - wait for a response: TRANSACTIONFINISHED - */ - ProcessWriteResponse(self,handle,pCon); - - /* - Is there a termination script? - */ - command = GetHdbProp(currentNode,"termscript"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - Tcl_Eval(InterpGetTcl(pServ->pSics),command); - answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics)); - if(strstr(answer,"idle") != NULL){ - answer = ANETreadPtr(handle,&length); - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } - } - } - - /* - Do I have to wait for a TASKFINISHED? - */ - command = GetHdbProp(currentNode,"taskwait"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - answer = ANETreadPtr(handle,&length); - if(length > 0 && strstr(answer,"TASKFINISHED") != NULL){ - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } - } - } - - - if(newHandle){ - ANETclose(handle); - } else { - self->writeInUse = 0; - } - + LLDblobAppend(self->writeList,&WD,sizeof(writeData)); + return hdbContinue; } return hdbContinue; @@ -746,7 +738,7 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -803,61 +795,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon, /*============================ remote execute =================================*/ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) { - int status, handle, newHandle = 0, length; - char *answer, *pEnd; - - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return 0; - } + int status; + char answer[65536]; /* write, thereby taking care to prefix with transact and for proper termination */ - if(strstr(command,"transact") == NULL){ - ANETwrite(handle,"transact ", sizeof("transact ")); - } - status = ANETwrite(handle,command,strlen(command)); - if(strstr(command,"\n") == NULL){ - ANETwrite(handle,"\r\n",2); - } - if(status < 0){ - traceIO("RO","Disconnect from %s while executing %s", self->host, command); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - return 0; - } - - /* - wait for response - */ - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - SCPrintf(pCon,eValue,answer); - } - ANETreadConsume(handle,length); - break; - } - } - - if(newHandle){ - ANETclose(handle); + memset(answer,0,sizeof(answer)-1); + status = transactCommand(self->transactHandle,command,answer,sizeof(answer)); + if(status){ + SCWrite(pCon,answer,eValue); } else { - self->writeInUse = 0; + SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); } - return 1; + return status; } /*------------------------------------------------------------------------------*/ static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, @@ -939,6 +891,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, self->host = strdup(argv[2]); self->port = atoi(argv[3]); self->readList = LLDblobCreate(); + self->writeList = LLDblobCreate(); + self->jtok = json_tokener_new(); ConnectRemoteObject(self); cmd = AddSICSHdbPar(pNew->objectNode, @@ -968,6 +922,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); + snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); + TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status; } From aef2a36b605bdec186f520ba5a086eb0fd95ca9e Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Mon, 4 May 2015 08:02:37 +0200 Subject: [PATCH 3/5] Now, I fiddling with the message writitng code, I get all remote object messages but in the wrong order. They are sent in the right order, though. Have to chnage the code to use one connection asynchronously for everything only. --- conman.c | 29 ++++++++++++++++++++++++++--- remoteobject.c | 19 ++++++++++++------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/conman.c b/conman.c index 5a8542b0..d2d06705 100644 --- a/conman.c +++ b/conman.c @@ -74,6 +74,15 @@ #include "sicshipadaba.h" #include "protocol.h" #include "sicsvar.h" +#include + +/* + Greetings from protocol.c for SCLogWrite... +*/ +extern struct json_object *mkJSON_Object(SConnection * pCon, char *pBuffer, + int iOut); + + /* #define UUDEB 1 define UUDEB , for buffer writing for checking encoding */ @@ -1082,13 +1091,14 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) { char pBueffel[1024]; char *pPtr; + json_object *myJson = NULL; /* for commandlog tail */ if (!VerifyConnection(self)) { return 0; } - if(self->iProtocolID == 5) { + if(self->iProtocolID == 4) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1100,6 +1110,12 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == 3) { + myJson = mkJSON_Object(self,buffer,iOut); + if(myJson != NULL){ + SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); + json_object_put(myJson); + } } else { testAndWriteSocket(self, buffer, iOut); } @@ -1113,6 +1129,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) { char pBueffel[1024]; char *pPtr; + json_object *myJson = NULL; if (!VerifyConnection(self)) { return 0; @@ -1121,7 +1138,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) WriteToCommandLogId(NULL, self->sockHandle, buffer); SetSendingConnection(NULL); - if(self->iProtocolID == 5) { + if(self->iProtocolID == 4) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1133,7 +1150,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } - } else if(self->iProtocolID == 2) { + } else if(self->iProtocolID == 2) { /* withcode */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1145,6 +1162,12 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == 3) { /* json */ + myJson = mkJSON_Object(self,buffer,iOut); + if(myJson != NULL){ + SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); + json_object_put(myJson); + } } else { testAndWriteSocket(self, buffer, iOut); } diff --git a/remoteobject.c b/remoteobject.c index b6ec41ee..2eb19559 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -17,7 +17,7 @@ * * COPRYRIGHT: see file COPYRIGHT * - * Mark Koennecke, February 2015 + * Mark Koennecke, February-May 2015 **/ #include #include @@ -209,8 +209,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) return; } - self->readHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port); + self->readHandle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port); if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ self->connected = 0; @@ -566,15 +566,15 @@ static int WriteResponseTask(void *pData) } pText = ANETreadPtr(self->writeHandle,&length); - if(length > 0){ + while(length > 0){ json_tokener_reset(self->jtok); message = json_tokener_parse_ex(self->jtok,pText,length); tokerr = self->jtok->err; if(tokerr == json_tokener_continue){ return 1; } else if(tokerr != json_tokener_success) { - traceIO("RO","JSON parsing error %s on %s from %s", - json_tokener_errors[tokerr], pText, self->host); + traceIO("RO","JSON parsing error %s on %s from %s %d", + json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); ANETreadConsume(self->writeHandle,length); return 1; } @@ -614,6 +614,8 @@ static int WriteResponseTask(void *pData) return 1; } pText = (char *)json_object_get_string(data); + + traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); status = LLDnodePtr2First(self->writeList); while(status == 1){ @@ -625,6 +627,9 @@ static int WriteResponseTask(void *pData) SCDeleteConnection(WD.pCon); LLDblobDelete(self->writeList); return 1; + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { + /* skip */ + return 1; } else if(strstr(pText,"TASKSTART") != NULL){ WD.waitTask = 1 ; LLDblobDelete(self->writeList); @@ -641,7 +646,7 @@ static int WriteResponseTask(void *pData) } status = LLDnodePtr2Next(self->writeList); } - + pText = ANETreadPtr(self->writeHandle,&length); } return 1; @@ -695,6 +700,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); + LLDblobAppend(self->writeList,&WD,sizeof(writeData)); status = ANETwrite(self->writeHandle,command,strlen(command)); free(command); DeleteDynString(data); @@ -704,7 +710,6 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - LLDblobAppend(self->writeList,&WD,sizeof(writeData)); return hdbContinue; } From 9517a6d14e827bea53bf02d8f43d4bac48e616fd Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Tue, 12 May 2015 08:36:38 +0200 Subject: [PATCH 4/5] Switched remoteobject to use a single connection with ACT --- conman.c | 4 +- remoteobject.c | 144 +++++++++++++++++-------------------------------- 2 files changed, 52 insertions(+), 96 deletions(-) diff --git a/conman.c b/conman.c index 86a0c163..8d99bb05 100644 --- a/conman.c +++ b/conman.c @@ -1098,7 +1098,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) return 0; } - if(self->iProtocolID == 4) { /* act */ + if(self->iProtocolID == PROTACT) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1110,7 +1110,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } - } else if(self->iProtocolID == 3) { + } else if(self->iProtocolID == PROTJSON) { myJson = mkJSON_Object(self,buffer,iOut); if(myJson != NULL){ SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); diff --git a/remoteobject.c b/remoteobject.c index 2eb19559..af72556e 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -36,6 +36,9 @@ #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ +#define READACT 7654 +#define POCHACT 8437 + static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); @@ -44,8 +47,7 @@ static int transactionID = 100000; typedef struct { char *host; int port; - int readHandle; - int writeHandle; + int handle; int transactHandle; int readList; int writeList; @@ -79,69 +81,13 @@ void KillRemoteOBJ(void *data) snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); StopTask(pServ->pTasker,roTaskName); free(self->host); - ANETclose(self->readHandle); - ANETclose(self->writeHandle); + ANETclose(self->handle); ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); LLDdeleteBlob(self->writeList); json_tokener_free(self->jtok); } } -/*========================= reading related code ================================*/ -static int RemoteReadCallback(int handle, void *userData) -{ - int length; - char *pPtr, *pStart, *pEnd; - - pPtr = ANETreadPtr(handle,&length); - - /* - * deal with command results - */ - pStart = strstr(pPtr, "TRANSACTIONSTART"); - pEnd = strstr(pPtr,"TRANSACTIONEND"); - if(pStart != NULL && pEnd != NULL){ - pStart = pStart + strlen("TRANSACTIONSTART"); - *pEnd = '\0'; - traceIO("RO","Received command - reply: %s", pStart); - pEnd += strlen("TRANSACTIONEND"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with update messages - */ - pStart = strstr(pPtr, "SROC:"); - pEnd = strstr(pPtr,":EROC\r\n"); - if(pStart != NULL && pEnd != NULL){ - pStart += strlen("SROC:"); - *pEnd = '\0'; - InterpExecute(pServ->pSics, pServ->dummyCon,pStart); - traceIO("RO", "Received %s from remote", pStart); - pEnd += strlen("EROC\r\n"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with heartbeats - */ - if((pStart = strstr(pPtr,"Poch")) != NULL){ - ANETreadConsume(handle,(pStart+4) - pPtr); - } - - /* - If there is more stuff to process: recurse - */ - pPtr = ANETreadPtr(handle,&length); - if(length > 0 && - ( strstr(pPtr,":EROC\r\n") != NULL || - strstr(pPtr,"TRANSACTIONEND") != NULL - || strstr(pPtr,"Poch") != NULL ) ) { - RemoteReadCallback(handle,userData); - } - - return 1; -} /*-----------------------------------------------------------------------------*/ static int transactCommand(int handle, char *command, char *reply, int replyLen) { @@ -209,10 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) return; } - self->writeHandle = ANETconnect(self->host, self->port); - self->readHandle = ANETconnect(self->host, self->port); + self->handle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ + if(self->handle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -225,26 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self) Default login with hard coded manager login. Defined in nserver.c */ - ANETwrite(self->readHandle,login,strlen(login)); - ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->handle,login,strlen(login)); ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* eat the login responses */ - pPtr = ANETreadPtr(self->readHandle, &length); - ANETreadConsume(self->readHandle,length); - pPtr = ANETreadPtr(self->writeHandle, &length); - ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->handle, &length); + ANETreadConsume(self->handle,length); pPtr = ANETreadPtr(self->transactHandle, &length); ANETreadConsume(self->transactHandle,length); + transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1); - /* - * install the read callback - */ - ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL); /* * Remove geterror on read nodes and reinstall callbacks for reconnects @@ -255,14 +194,13 @@ static void ConnectRemoteObject(pRemoteOBJ self) node = FindHdbNode(NULL,rd.localNode,NULL); if(node != NULL){ SetHdbProperty(node,"geterror",NULL); - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); } status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1); self->connected = 1; } @@ -291,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, pUpdateCallback uppi = (pUpdateCallback)userData; hdbDataMessage *mm = NULL; pDynString text; - char *prefix = {"SROC:hupdate "}; - char *postfix= {":EROC\r\n"}; + char *prefix = {"hupdate "}; + char *postfix= {" \r\n"}; char *txt = NULL; int length; pHdbPropertyChange propChange = NULL; @@ -330,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, if(!SCisConnected(uppi->sendCon)){ return hdbKill; } - length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + + length = strlen("hdelprop ") + strlen(uppi->remotePath) + strlen(propChange->key) + 10; if(propChange->value != NULL){ length += strlen(propChange->value); @@ -340,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, return hdbContinue; } if(propChange->value == NULL){ - snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, + snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath, propChange->key,postfix); } else { - snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, + snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath, propChange->key,propChange->value, postfix); } SCWrite(uppi->sendCon,txt,eValue); @@ -469,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Install a callback on the remote node to update the master. The remote should * then immediatly send an update which will be processed by the read callback. */ - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); return 1; } @@ -513,10 +451,10 @@ static int HeartbeatTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; int status; - char command[] = {"Poch\r\n"}; + char command[] = {"contextdo 8437 Poch\r\n"}; if (time(NULL) > self->nextHeartbeat){ - status = ANETwrite(self->readHandle,command, strlen(command)); + status = ANETwrite(self->handle,command, strlen(command)); if(status != 1){ traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); self->connected = 0; @@ -561,11 +499,11 @@ static int WriteResponseTask(void *pData) OutCode eOut; writeData WD; - if(!ANETvalidHandle(self->writeHandle)) { + if(!ANETvalidHandle(self->handle)) { return 1; } - pText = ANETreadPtr(self->writeHandle,&length); + pText = ANETreadPtr(self->handle,&length); while(length > 0){ json_tokener_reset(self->jtok); message = json_tokener_parse_ex(self->jtok,pText,length); @@ -575,19 +513,19 @@ static int WriteResponseTask(void *pData) } else if(tokerr != json_tokener_success) { traceIO("RO","JSON parsing error %s on %s from %s %d", json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); - ANETreadConsume(self->writeHandle,length); + ANETreadConsume(self->handle,length); return 1; } if(json_object_get_type(message) != json_type_object) { traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); - ANETreadConsume(self->writeHandle,length); + ANETreadConsume(self->handle,length); return 1; } /* we need to consume here what has been parsed. The char_offset in the tokenizer structure might tell us that... */ - ANETreadConsume(self->writeHandle,self->jtok->char_offset); + ANETreadConsume(self->handle,self->jtok->char_offset); /* @@ -616,6 +554,24 @@ static int WriteResponseTask(void *pData) pText = (char *)json_object_get_string(data); traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); + + /* + do nothing on Poch + */ + if(transID == POCHACT){ + pText = ANETreadPtr(self->handle,&length); + continue; + } + + /* + process update messages + */ + if(transID == READACT){ + InterpExecute(pServ->pSics,pServ->dummyCon,pText); + traceIO("RO","Received %s from remote",pText); + pText = ANETreadPtr(self->handle,&length); + continue; + } status = LLDnodePtr2First(self->writeList); while(status == 1){ @@ -646,7 +602,7 @@ static int WriteResponseTask(void *pData) } status = LLDnodePtr2Next(self->writeList); } - pText = ANETreadPtr(self->writeHandle,&length); + pText = ANETreadPtr(self->handle,&length); } return 1; @@ -701,7 +657,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); LLDblobAppend(self->writeList,&WD,sizeof(writeData)); - status = ANETwrite(self->writeHandle,command,strlen(command)); + status = ANETwrite(self->handle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ @@ -927,7 +883,7 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); - snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); + snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status; From c60e9920c5d1e36261fdbc4ac639f11fab3e4b61 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Wed, 20 May 2015 08:55:49 +0200 Subject: [PATCH 5/5] First working verion of remote object Fixed two bugs along the way: - asynnet: ANETReadPtr now sets length to 0 if ther eis no data - motorsec: if the motor does not start the status is set to error --- asynnet.c | 1 + motorsec.c | 2 +- remoteobject.c | 97 ++++++++++++++++++++++++++++++++------------------ 3 files changed, 65 insertions(+), 35 deletions(-) diff --git a/asynnet.c b/asynnet.c index 46592d9b..7b707e61 100644 --- a/asynnet.c +++ b/asynnet.c @@ -581,6 +581,7 @@ void *ANETreadPtr(int handle, int *length) con = findSocketDescriptor(handle); if (con == NULL) { + *length = 0; return NULL; } else { data = GetRWBufferData(con->readBuffer, length); diff --git a/motorsec.c b/motorsec.c index 0288e4bc..723326f7 100644 --- a/motorsec.c +++ b/motorsec.c @@ -527,7 +527,7 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData, SCSetInterrupt(pCon, eAbortBatch); self->pDrivInt->iErrorCount = 0; child = GetHipadabaNode(self->pDescriptor->parNode, "status"); - UpdateHipadabaPar(child, MakeHdbText("run"), pCon); + UpdateHipadabaPar(child, MakeHdbText("error"), pCon); return hdbAbort; } diff --git a/remoteobject.c b/remoteobject.c index af72556e..0e78a3a4 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -489,6 +489,44 @@ static OutCode findOutCode(char *txt) return eValue; } /*-----------------------------------------------------------------------------------*/ +static void CheckWriteList(int writeList,int transID, OutCode eOut, char *pText) +{ + int status; + writeData WD; + + status = LLDnodePtr2First(writeList); + while(status == 1){ + LLDblobData(writeList,&WD); + if(WD.transID == transID){ + if(strstr(pText,"COMSTART") != NULL){ + /* skip */ + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { + SCDeleteConnection(WD.pCon); + LLDblobDelete(writeList); + return; + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { + /* skip */ + return; + } else if(strstr(pText,"TASKSTART") != NULL){ + WD.waitTask = 1 ; + LLDblobDelete(writeList); + LLDblobAppend(writeList,&WD, sizeof(writeData)); + return; + } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ + SCDeleteConnection(WD.pCon); + LLDblobDelete(writeList); + return; + } else { + if(strstr(pText,"OK") == NULL){ + SCWrite(WD.pCon,pText,eOut); + } + return; + } + } + status = LLDnodePtr2Next(writeList); + } + } +/*-----------------------------------------------------------------------------------*/ static int WriteResponseTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; @@ -560,6 +598,7 @@ static int WriteResponseTask(void *pData) */ if(transID == POCHACT){ pText = ANETreadPtr(self->handle,&length); + json_object_put(message); continue; } @@ -567,41 +606,21 @@ static int WriteResponseTask(void *pData) process update messages */ if(transID == READACT){ - InterpExecute(pServ->pSics,pServ->dummyCon,pText); + if(strstr(pText,"hupdate") != NULL || strstr(pText,"prop") != NULL){ + InterpExecute(pServ->pSics,pServ->dummyCon,pText); + } traceIO("RO","Received %s from remote",pText); pText = ANETreadPtr(self->handle,&length); + json_object_put(message); continue; } + + /* + check write List + */ + CheckWriteList(self->writeList,transID,eOut,pText); + json_object_put(message); - status = LLDnodePtr2First(self->writeList); - while(status == 1){ - LLDblobData(self->writeList,&WD); - if(WD.transID == transID){ - if(strstr(pText,"COMSTART") != NULL){ - /* skip */ - } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { - SCDeleteConnection(WD.pCon); - LLDblobDelete(self->writeList); - return 1; - } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { - /* skip */ - return 1; - } else if(strstr(pText,"TASKSTART") != NULL){ - WD.waitTask = 1 ; - LLDblobDelete(self->writeList); - LLDblobAppend(self->writeList,&WD, sizeof(writeData)); - return 1; - } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ - SCDeleteConnection(WD.pCon); - LLDblobDelete(self->writeList); - return 1; - } else { - SCWrite(WD.pCon,pText,eOut); - return 1; - } - } - status = LLDnodePtr2Next(self->writeList); - } pText = ANETreadPtr(self->handle,&length); } @@ -661,10 +680,15 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, free(command); DeleteDynString(data); if(status < 0){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host); + self->connected = 0; + ConnectRemoteObject(self); + if(self->connected == 0){ + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", + remoteNode, self->host); + } + return hdbAbort; } - return hdbAbort; } return hdbContinue; } @@ -694,6 +718,11 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) SetHdbProperty(localNode,"remotewrite",rd.remoteNode); AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback, self,NULL)); + /* + TODO: The connected write nodes should be held in a list in order to be able to + remove the write callbacks when deleting the remote object. As removing remote + objects usually only happens when SICS shuts down this is not so important. + */ /* * Get information about the remote node and check compatability @@ -764,7 +793,7 @@ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) */ memset(answer,0,sizeof(answer)-1); status = transactCommand(self->transactHandle,command,answer,sizeof(answer)); - if(status){ + if(status == 1){ SCWrite(pCon,answer,eValue); } else { SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);