From 0d0c90cee7da6aff92f34183a56f66c704b54936 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Thu, 19 Mar 2015 16:12:35 +0100 Subject: [PATCH 01/13] Another try ad getting write to work --- conman.c | 9 ++++++++- conman.h | 1 + interface.c | 6 ++++++ remoteobject.c | 27 +++++++++++++++++++++++---- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/conman.c b/conman.c index 88b2ad1f..f9ca2242 100644 --- a/conman.c +++ b/conman.c @@ -229,6 +229,7 @@ static SConnection *CreateConnection(SicsInterp * pSics) pRes->conStart = time(NULL); pRes->write = SCNormalWrite; pRes->runLevel = RUNDRIVE; + pRes->remote = 0; /* initialise context variables */ pRes->iCmdCtr = 0; @@ -461,6 +462,7 @@ SConnection *SCCopyConnection(SConnection * pCon) result->iList = -1; result->runLevel = pCon->runLevel; result->data = pCon->data; + result->remote = pCon->remote; return result; } @@ -1768,6 +1770,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) config File Filename Logs to another file config output normal | withcode | ACT Sets output mode config listen 0 | 1 enables commandlog listen mode + config remote sets the remote connection flag ---------------------------------------------------------------------------*/ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, @@ -1825,7 +1828,11 @@ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, SCSendOK(pCon); return 1; } - } + } else if(strcmp(argv[1],"remote") == 0) { + pMaster->remote = 1; + pCon->remote = 1; + return 1; + } /* check no or args */ if (argc < 3) { diff --git a/conman.h b/conman.h index 91befe22..99a9dd16 100644 --- a/conman.h +++ b/conman.h @@ -71,6 +71,7 @@ typedef struct __SConnection { pCosta pStack; /* stack of pending commands */ int contextStack; /* context stack: may go? */ mkChannel *pSock; /* for temporary backwards compatability */ + int remote; /* true if this is a remote object connection */ } SConnection; #include "nserver.h" diff --git a/interface.c b/interface.c index d661b2c3..f3b9790f 100644 --- a/interface.c +++ b/interface.c @@ -241,6 +241,9 @@ static int DriveTaskFunc(void *data) } else { ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } + if(taskData->pCon->remote){ + SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + } traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); return 0; } @@ -393,6 +396,9 @@ static int CountTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("count","CountTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->remote){ + SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + } return 0; } /*--------------------------------------------------------------------------*/ diff --git a/remoteobject.c b/remoteobject.c index 5cad0a91..c1912769 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -240,8 +240,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)); - + transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)-1); + transactCommand(self->writeHandle,"config remote\r\n",command,sizeof(command)-1); + self->connected = 1; self->writeInUse = 0; } @@ -537,7 +538,7 @@ static void printSICS(char *answer, SConnection *pCon) while(pPtr != NULL){ memset(line,0,sizeof(line)); pPtr = stptok(pPtr,line,sizeof(line),"\n"); - if(strstr(line,"OK") == NULL){ + if(strstr(line,"OK") == NULL && strstr(line,"TASKFINISHED") == NULL){ pCode = strstr(line,"@@"); if(pCode != NULL){ *pCode = '\0'; @@ -578,6 +579,7 @@ static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle *newHandle = 1; transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); + transactCommand(handle,"config remote\r\n",command,sizeof(command)-1); } else { self->writeInUse = 1; @@ -609,7 +611,7 @@ static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) printSICS(answer,pCon); } traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); - ANETreadConsume(handle,length); + ANETreadConsume(handle,pEnd+strlen("TRANSACTIONFINISHED") - answer); break; } } @@ -688,6 +690,23 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } } + /* + Do I have to wait for a TASKFINISHED? + */ + command = GetHdbProp(currentNode,"taskwait"); + if(command != NULL){ + while(1) { + TaskYield(pServ->pTasker); + answer = ANETreadPtr(handle,&length); + if(length > 0 && strstr(answer,"TASKFINISHED") != NULL){ + printSICS(answer,pCon); + traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); + ANETreadConsume(handle,length); + break; + } + } + } + if(newHandle){ ANETclose(handle); From b7eaa538ed90cbad11d6c0b7b037600594a18b73 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Thu, 23 Apr 2015 15:52:05 +0200 Subject: [PATCH 02/13] - Rewrote the write part of remoteobject - With transactionID > 10^6, now termination messages are sent by contextdo and tasks - This still has bugs --- conman.c | 2 +- interface.c | 16 ++- protocol.c | 6 + remoteobject.c | 342 +++++++++++++++++++++---------------------------- 4 files changed, 167 insertions(+), 199 deletions(-) diff --git a/conman.c b/conman.c index f9ca2242..e2b993ac 100644 --- a/conman.c +++ b/conman.c @@ -1744,7 +1744,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand) memset(pBueffel, 0, 80); stptok(trim(pCommand), pBueffel, 79, " "); self->iCmdCtr++; - if (999999 < self->iCmdCtr) { + if (self->iCmdCtr > 99998) { self->iCmdCtr = 0; } self->transID = self->iCmdCtr; diff --git a/interface.c b/interface.c index f3b9790f..6f84a604 100644 --- a/interface.c +++ b/interface.c @@ -241,10 +241,10 @@ static int DriveTaskFunc(void *data) } else { ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } - if(taskData->pCon->remote){ - SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); - } traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); + } return 0; } /*--------------------------------------------------------------------------*/ @@ -275,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget) ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); InvokeNewTarget(pServ->pExecutor,name,fTarget); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = DRIVEID; taskData->obj = obj; @@ -396,8 +399,8 @@ static int CountTaskFunc(void *data) ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); } traceSys("count","CountTask %s finished with state %d", taskData->name,status); - if(taskData->pCon->remote){ - SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); + if(taskData->pCon->transID > 100000) { + SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID); } return 0; } @@ -424,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name) } ExeInterest(pServ->pExecutor,name,"started"); DevexecLog("START",name); + if(pCon->transID > 100000) { + SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID); + } taskData->id = COUNTID; taskData->obj = obj; diff --git a/protocol.c b/protocol.c index 9891913d..7908bec0 100644 --- a/protocol.c +++ b/protocol.c @@ -188,7 +188,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData, SCWrite(pCon, "ERROR: no more memory", eError); return 0; } + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID); + } status = InterpExecute(pSics, comCon, command); + if(comCon->transID > 100000) { + SCPrintf(comCon,eLog,"COMEND %d", comCon->transID); + } if (command != buffer) free(command); SCDeleteConnection(comCon); diff --git a/remoteobject.c b/remoteobject.c index c1912769..b6ec41ee 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -2,7 +2,7 @@ * Remote objects in sicsobj. This means accessing remote objects in a different * SICS server from a master SICS server. * - * Reading is implementd according to this scheme: + * Reading is implemented according to this scheme: * * * When a read connection is made between a local node and a remote node in slave, then a * callback is installed on remote node in slave. @@ -31,22 +31,27 @@ #include #include #include +#include #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); + +static int transactionID = 100000; /*---------------------- our very private data structure -------------------*/ typedef struct { char *host; int port; int readHandle; int writeHandle; - int writeInUse; + int transactHandle; int readList; + int writeList; unsigned int connected; time_t nextHeartbeat; + struct json_tokener *jtok; } RemoteOBJ, *pRemoteOBJ; /*----------------------------------------------------------------------------*/ typedef struct { @@ -59,6 +64,12 @@ typedef struct { char *remotePath; } UpdateCallback, *pUpdateCallback; /*----------------------------------------------------------------------------*/ +typedef struct { + int transID; + SConnection *pCon; + int waitTask; +}writeData, *pWriteData; +/*----------------------------------------------------------------------------*/ void KillRemoteOBJ(void *data) { char roTaskName[132]; @@ -70,7 +81,10 @@ void KillRemoteOBJ(void *data) free(self->host); ANETclose(self->readHandle); ANETclose(self->writeHandle); + ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); + LLDdeleteBlob(self->writeList); + json_tokener_free(self->jtok); } } /*========================= reading related code ================================*/ @@ -135,7 +149,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) char *prefix = {"transact "}; int status, length, type; time_t start; - char *pPtr; + char *pPtr, *pEnd; /* * read possible dirt of the line @@ -144,12 +158,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) ANETreadConsume(handle,length); - toSend = malloc(strlen(command) + strlen(prefix) + 1); + toSend = malloc(strlen(command) + strlen(prefix) + 10); if(toSend == NULL){ return OOM; } strcpy(toSend, prefix); strcat(toSend, command); + if(strstr(command,"\n") == NULL){ + strcat(toSend,"\r\n"); + } status = ANETwrite(handle,toSend,strlen(toSend)); free(toSend); if(status != 1){ @@ -163,7 +180,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen) while(time(NULL) < start + 2.0){ ANETprocess(); pPtr = ANETreadPtr(handle,&length); - if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ + if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){ + *pEnd = '\0'; strncpy(reply,pPtr,replyLen); ANETreadConsume(handle,length); return 1; @@ -193,7 +211,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) self->readHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0){ + self->transactHandle = ANETconnect(self->host, self->port); + if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -208,6 +227,7 @@ static void ConnectRemoteObject(pRemoteOBJ self) */ ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* @@ -217,6 +237,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) ANETreadConsume(self->readHandle,length); pPtr = ANETreadPtr(self->writeHandle, &length); ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->transactHandle, &length); + ANETreadConsume(self->transactHandle,length); /* @@ -240,11 +262,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)-1); - transactCommand(self->writeHandle,"config remote\r\n",command,sizeof(command)-1); + transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1); self->connected = 1; - self->writeInUse = 0; } /*-----------------------------------------------------------------------------*/ static void MarkDisconnected(pRemoteOBJ self) @@ -398,7 +418,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ /* * try a reconnect, @@ -408,7 +428,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) */ self->connected = 0; ConnectRemoteObject(self); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -510,11 +530,13 @@ static int HeartbeatTask(void *pData) return 1; } /*============================= writing related code =========================== - The logic here is to use the standard writeHandle when available. I expect most - communication to be short and to happen through the writeHandle. If that one is - in use, a new connection will be built. - --------------------------------------------------------------------------------- - suppress all superfluous OK from the slave + This works by sending the command via contextdo with a ID > 10^6. This causes + the remote SICS to send the termination messages. The transaction IDs together + with the connection responsible for it are kept in a list. + + This list is used by the write task to forward messages properly and for handling + termination. + -----------------------------------------------------------------------------------*/ #include static OutCode findOutCode(char *txt) @@ -528,94 +550,111 @@ static OutCode findOutCode(char *txt) } return eValue; } -/*--------------------------------------------------------------------------------*/ -static void printSICS(char *answer, SConnection *pCon) +/*-----------------------------------------------------------------------------------*/ +static int WriteResponseTask(void *pData) { - char line[1024], *pPtr, *pCode; - OutCode eCode; + pRemoteOBJ self = (pRemoteOBJ)pData; + int status, length = 0, transID; + char *pText, *outTxt; + json_object *message = NULL, *data = NULL; + enum json_tokener_error tokerr; + OutCode eOut; + writeData WD; - pPtr = answer; - while(pPtr != NULL){ - memset(line,0,sizeof(line)); - pPtr = stptok(pPtr,line,sizeof(line),"\n"); - if(strstr(line,"OK") == NULL && strstr(line,"TASKFINISHED") == NULL){ - pCode = strstr(line,"@@"); - if(pCode != NULL){ - *pCode = '\0'; - pCode += 2; - eCode = findOutCode(trim(pCode)); - - } else { - eCode = eValue; - } - SCWrite(pCon,line,eCode); - } + if(!ANETvalidHandle(self->writeHandle)) { + return 1; } -} -/*---------------------------------------------------------------------------------*/ -static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) -{ - int handle, length; - char *answer = NULL; - char command[80]; - if(self->writeInUse) { - handle = ANETconnect(self->host,self->port); - if(handle < 0){ - traceIO("RO","Failed to connect to %s at %d", self->host, self->port); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); - } - return handle; + pText = ANETreadPtr(self->writeHandle,&length); + if(length > 0){ + json_tokener_reset(self->jtok); + message = json_tokener_parse_ex(self->jtok,pText,length); + tokerr = self->jtok->err; + if(tokerr == json_tokener_continue){ + return 1; + } else if(tokerr != json_tokener_success) { + traceIO("RO","JSON parsing error %s on %s from %s", + json_tokener_errors[tokerr], pText, self->host); + ANETreadConsume(self->writeHandle,length); + return 1; + } + if(json_object_get_type(message) != json_type_object) { + traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); + ANETreadConsume(self->writeHandle,length); + return 1; } - ANETwrite(handle,login,strlen(login)); - usleep(500); - ANETprocess(); /* - eat the login responses + we need to consume here what has been parsed. + The char_offset in the tokenizer structure might tell us that... */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); - *newHandle = 1; + ANETreadConsume(self->writeHandle,self->jtok->char_offset); - transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); - transactCommand(handle,"config remote\r\n",command,sizeof(command)-1); - } else { - self->writeInUse = 1; - handle = self->writeHandle; /* - eat dirt from the line + Received a valid message, process */ - answer = ANETreadPtr(handle, &length); - ANETreadConsume(handle,length); + data = json_object_object_get(message,"trans"); + if(data == NULL){ + traceIO("RO","No transaction ID found in %s from %s", pText,self->host); + return 1; + } + transID = json_object_get_int(data); + + data = json_object_object_get(message,"flag"); + if(data == NULL){ + traceIO("RO","No flag found in %s from %s", pText,self->host); + return 1; + } + outTxt = (char *)json_object_get_string(data); + eOut = findOutCode(outTxt); + + data = json_object_object_get(message,"data"); + if(data == NULL){ + traceIO("RO","No data found in %s from %s", pText,self->host); + return 1; + } + pText = (char *)json_object_get_string(data); + + status = LLDnodePtr2First(self->writeList); + while(status == 1){ + LLDblobData(self->writeList,&WD); + if(WD.transID == transID){ + if(strstr(pText,"COMSTART") != NULL){ + /* skip */ + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { + SCDeleteConnection(WD.pCon); + LLDblobDelete(self->writeList); + return 1; + } else if(strstr(pText,"TASKSTART") != NULL){ + WD.waitTask = 1 ; + LLDblobDelete(self->writeList); + LLDblobAppend(self->writeList,&WD, sizeof(writeData)); + return 1; + } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ + SCDeleteConnection(WD.pCon); + LLDblobDelete(self->writeList); + return 1; + } else { + SCWrite(WD.pCon,pText,eOut); + return 1; + } + } + status = LLDnodePtr2Next(self->writeList); + } + } - return handle; + + return 1; } + /*---------------------------------------------------------------------------------*/ -static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) +static int IncrementTransactionID() { - char *answer = NULL, *pEnd, *command = NULL; - int length; - - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host); - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - printSICS(answer,pCon); - } - traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); - ANETreadConsume(handle,pEnd+strlen("TRANSACTIONFINISHED") - answer); - break; - } + transactionID++; + if(transactionID >= 200000){ + transactionID = 100000; } - + return transactionID; } /*---------------------------------------------------------------------------------*/ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, @@ -628,14 +667,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, pDynString data; char *remoteNode; char *command, *answer, *pEnd; - + writeData WD; if((mm = GetHdbSetMessage(mes)) != NULL){ pCon = (SConnection *)mm->callData; - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return hdbAbort; - } /* build the command to send @@ -650,13 +685,17 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); + WD.pCon = SCCopyConnection(pCon); + WD.waitTask = 0; + WD.transID = IncrementTransactionID(); + snprintf(command,length,"contextdo %d hset %s %s\r\n", + WD.transID, remoteNode, GetCharArray(data)); /* write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); - status = ANETwrite(handle,command,strlen(command)); + status = ANETwrite(self->writeHandle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ @@ -665,55 +704,8 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - - /* - wait for a response: TRANSACTIONFINISHED - */ - ProcessWriteResponse(self,handle,pCon); - - /* - Is there a termination script? - */ - command = GetHdbProp(currentNode,"termscript"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - Tcl_Eval(InterpGetTcl(pServ->pSics),command); - answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics)); - if(strstr(answer,"idle") != NULL){ - answer = ANETreadPtr(handle,&length); - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } - } - } - - /* - Do I have to wait for a TASKFINISHED? - */ - command = GetHdbProp(currentNode,"taskwait"); - if(command != NULL){ - while(1) { - TaskYield(pServ->pTasker); - answer = ANETreadPtr(handle,&length); - if(length > 0 && strstr(answer,"TASKFINISHED") != NULL){ - printSICS(answer,pCon); - traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); - ANETreadConsume(handle,length); - break; - } - } - } - - - if(newHandle){ - ANETclose(handle); - } else { - self->writeInUse = 0; - } - + LLDblobAppend(self->writeList,&WD,sizeof(writeData)); + return hdbContinue; } return hdbContinue; @@ -746,7 +738,7 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); - status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + status = transactCommand(self->transactHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); @@ -803,61 +795,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon, /*============================ remote execute =================================*/ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) { - int status, handle, newHandle = 0, length; - char *answer, *pEnd; - - handle = PrepareWriteHandle(self,pCon,&newHandle); - if(handle < 0){ - return 0; - } + int status; + char answer[65536]; /* write, thereby taking care to prefix with transact and for proper termination */ - if(strstr(command,"transact") == NULL){ - ANETwrite(handle,"transact ", sizeof("transact ")); - } - status = ANETwrite(handle,command,strlen(command)); - if(strstr(command,"\n") == NULL){ - ANETwrite(handle,"\r\n",2); - } - if(status < 0){ - traceIO("RO","Disconnect from %s while executing %s", self->host, command); - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - return 0; - } - - /* - wait for response - */ - while(1){ - TaskYield(pServ->pTasker); - if(!ANETvalidHandle(handle)){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); - } - break; - } - answer = ANETreadPtr(handle,&length); - if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ - if(pCon != NULL){ - *pEnd = '\0'; - SCPrintf(pCon,eValue,answer); - } - ANETreadConsume(handle,length); - break; - } - } - - if(newHandle){ - ANETclose(handle); + memset(answer,0,sizeof(answer)-1); + status = transactCommand(self->transactHandle,command,answer,sizeof(answer)); + if(status){ + SCWrite(pCon,answer,eValue); } else { - self->writeInUse = 0; + SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); } - return 1; + return status; } /*------------------------------------------------------------------------------*/ static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, @@ -939,6 +891,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, self->host = strdup(argv[2]); self->port = atoi(argv[3]); self->readList = LLDblobCreate(); + self->writeList = LLDblobCreate(); + self->jtok = json_tokener_new(); ConnectRemoteObject(self); cmd = AddSICSHdbPar(pNew->objectNode, @@ -968,6 +922,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); + snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); + TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status; } From e497e2daf305af85ded20f8f6b5c10d4dfb16c1a Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Thu, 30 Apr 2015 09:25:36 +0200 Subject: [PATCH 03/13] Fixed minor typos and compilation issues --- devser.c | 3 --- motorsec.c | 2 +- scriptcontext.c | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/devser.c b/devser.c index fc1f90b2..bb6a7a92 100644 --- a/devser.c +++ b/devser.c @@ -151,9 +151,6 @@ static DevAction *DevNextAction(DevSer * devser) } static void LogStart(DevSer *self) { - if(self->startTime > 0){ - printf("DEVSER: there is something fucked up in LogStart. Investigate!\n"); - } self->startTime = DoubleTime(); } static void LogResponse(DevSer *self, int error) diff --git a/motorsec.c b/motorsec.c index 6f0ed33a..0288e4bc 100644 --- a/motorsec.c +++ b/motorsec.c @@ -298,7 +298,7 @@ static int SecMotorStatus(void *sulf, SConnection * pCon) int status; pHdb node = NULL; hdbValue v; - float interrupt; + float interrupt = 0.; char error[132]; assert(sulf); diff --git a/scriptcontext.c b/scriptcontext.c index e1152fc3..b63e565b 100644 --- a/scriptcontext.c +++ b/scriptcontext.c @@ -553,7 +553,7 @@ static char *SctActionHandler(void *actionData, char *lastReply, } else { l = strlen(origScript); } - snprintf(eprop, sizeof eprop, "error_in_%.*s", l, origScript); + snprintf(eprop, sizeof eprop, "error_in_%s", origScript); emsg = GetHdbProp(node, eprop); cnt = 0; if (emsg != NULL) { From b770b8c0046971dcf31e172adb646fd7ccb327a3 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Thu, 30 Apr 2015 09:26:22 +0200 Subject: [PATCH 04/13] stptok playing up --- stptok.h | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/stptok.h b/stptok.h index e4533d7f..6e03e451 100644 --- a/stptok.h +++ b/stptok.h @@ -1,14 +1,14 @@ -/* -** stptok() -- public domain by Ray Gardner, modified by Bob Stout -** -** You pass this function a string to parse, a buffer to receive the -** "token" that gets scanned, the length of the buffer, and a string of -** "break" characters that stop the scan. It will copy the string into -** the buffer up to any of the break characters, or until the buffer is -** full, and will always leave the buffer null-terminated. It will -** return a pointer to the first non-breaking character after the one -** that stopped the scan. -*/ +/* +** stptok() -- public domain by Ray Gardner, modified by Bob Stout +** +** You pass this function a string to parse, a buffer to receive the +** "token" that gets scanned, the length of the buffer, and a string of +** "break" characters that stop the scan. It will copy the string into +** the buffer up to any of the break characters, or until the buffer is +** full, and will always leave the buffer null-terminated. It will +** return a pointer to the first non-breaking character after the one +** that stopped the scan. +*/ #ifndef STPSTPTOK #define STPSTPTOK char *stptok(const char *s, char *tok, size_t toklen, char *brk); From aef2a36b605bdec186f520ba5a086eb0fd95ca9e Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Mon, 4 May 2015 08:02:37 +0200 Subject: [PATCH 05/13] Now, I fiddling with the message writitng code, I get all remote object messages but in the wrong order. They are sent in the right order, though. Have to chnage the code to use one connection asynchronously for everything only. --- conman.c | 29 ++++++++++++++++++++++++++--- remoteobject.c | 19 ++++++++++++------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/conman.c b/conman.c index 5a8542b0..d2d06705 100644 --- a/conman.c +++ b/conman.c @@ -74,6 +74,15 @@ #include "sicshipadaba.h" #include "protocol.h" #include "sicsvar.h" +#include + +/* + Greetings from protocol.c for SCLogWrite... +*/ +extern struct json_object *mkJSON_Object(SConnection * pCon, char *pBuffer, + int iOut); + + /* #define UUDEB 1 define UUDEB , for buffer writing for checking encoding */ @@ -1082,13 +1091,14 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) { char pBueffel[1024]; char *pPtr; + json_object *myJson = NULL; /* for commandlog tail */ if (!VerifyConnection(self)) { return 0; } - if(self->iProtocolID == 5) { + if(self->iProtocolID == 4) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1100,6 +1110,12 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == 3) { + myJson = mkJSON_Object(self,buffer,iOut); + if(myJson != NULL){ + SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); + json_object_put(myJson); + } } else { testAndWriteSocket(self, buffer, iOut); } @@ -1113,6 +1129,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) { char pBueffel[1024]; char *pPtr; + json_object *myJson = NULL; if (!VerifyConnection(self)) { return 0; @@ -1121,7 +1138,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) WriteToCommandLogId(NULL, self->sockHandle, buffer); SetSendingConnection(NULL); - if(self->iProtocolID == 5) { + if(self->iProtocolID == 4) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1133,7 +1150,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } - } else if(self->iProtocolID == 2) { + } else if(self->iProtocolID == 2) { /* withcode */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1145,6 +1162,12 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == 3) { /* json */ + myJson = mkJSON_Object(self,buffer,iOut); + if(myJson != NULL){ + SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); + json_object_put(myJson); + } } else { testAndWriteSocket(self, buffer, iOut); } diff --git a/remoteobject.c b/remoteobject.c index b6ec41ee..2eb19559 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -17,7 +17,7 @@ * * COPRYRIGHT: see file COPYRIGHT * - * Mark Koennecke, February 2015 + * Mark Koennecke, February-May 2015 **/ #include #include @@ -209,8 +209,8 @@ static void ConnectRemoteObject(pRemoteOBJ self) return; } - self->readHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port); + self->readHandle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port); if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ self->connected = 0; @@ -566,15 +566,15 @@ static int WriteResponseTask(void *pData) } pText = ANETreadPtr(self->writeHandle,&length); - if(length > 0){ + while(length > 0){ json_tokener_reset(self->jtok); message = json_tokener_parse_ex(self->jtok,pText,length); tokerr = self->jtok->err; if(tokerr == json_tokener_continue){ return 1; } else if(tokerr != json_tokener_success) { - traceIO("RO","JSON parsing error %s on %s from %s", - json_tokener_errors[tokerr], pText, self->host); + traceIO("RO","JSON parsing error %s on %s from %s %d", + json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); ANETreadConsume(self->writeHandle,length); return 1; } @@ -614,6 +614,8 @@ static int WriteResponseTask(void *pData) return 1; } pText = (char *)json_object_get_string(data); + + traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); status = LLDnodePtr2First(self->writeList); while(status == 1){ @@ -625,6 +627,9 @@ static int WriteResponseTask(void *pData) SCDeleteConnection(WD.pCon); LLDblobDelete(self->writeList); return 1; + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { + /* skip */ + return 1; } else if(strstr(pText,"TASKSTART") != NULL){ WD.waitTask = 1 ; LLDblobDelete(self->writeList); @@ -641,7 +646,7 @@ static int WriteResponseTask(void *pData) } status = LLDnodePtr2Next(self->writeList); } - + pText = ANETreadPtr(self->writeHandle,&length); } return 1; @@ -695,6 +700,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); + LLDblobAppend(self->writeList,&WD,sizeof(writeData)); status = ANETwrite(self->writeHandle,command,strlen(command)); free(command); DeleteDynString(data); @@ -704,7 +710,6 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, } return hdbAbort; } - LLDblobAppend(self->writeList,&WD,sizeof(writeData)); return hdbContinue; } From 7d535abd731bb5dc5dde3169bfe5866d6d1919a7 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Mon, 4 May 2015 08:04:54 +0200 Subject: [PATCH 06/13] Fixed a core dump in monitor mode in countersec.c --- countersec.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/countersec.c b/countersec.c index e393786b..2ec4f3e8 100644 --- a/countersec.c +++ b/countersec.c @@ -184,13 +184,17 @@ static int SecCtrCheckStatus(void *pData, SConnection *pCon) fControl = v.v.doubleValue; } else { node = GetHipadabaNode(self->pDes->parNode,"values"); - assert(node != NULL); - /* - The 1 below is only correct for PSI where only the first - monitor can be the control monitor. Elsewhere this must be the - control monitor channel - */ - fControl = v.v.intArray[1]; + if(node != NULL) { + /* + This can be NULL if the counter is a HM. The values does not + exist and fControl is useless + + The 1 below is only correct for PSI where only the first + monitor can be the control monitor. Elsewhere this must be the + control monitor channel + */ + fControl = v.v.intArray[1]; + } } From 0bcd3b06f6e12c5fc519ca92f5f70915bd7af029 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Fri, 8 May 2015 13:43:49 +0200 Subject: [PATCH 07/13] Fixed a critical bug which prevented gumtree SE from working. No ACt was sent after removing the sycamore protocol --- conman.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/conman.c b/conman.c index a1d62e34..c8bad638 100644 --- a/conman.c +++ b/conman.c @@ -1323,7 +1323,7 @@ int SCWriteZipped(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 5) { + if (protocolID == 4) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName, compressedLength, cc.transID); @@ -1398,7 +1398,7 @@ int SCWriteBinary(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 5) { + if (protocolID == 4) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN BIN %s %d %d\r\n", pName, iDataLen, cc.transID); @@ -1504,7 +1504,7 @@ int SCWriteZippedOld(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 5) { + if (protocolID == 4) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName, compressedLength, cc.transID); From bb7eb497de6bc40d88ec721c19864652bf14012c Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Mon, 11 May 2015 09:30:44 +0200 Subject: [PATCH 08/13] Refactored protocol code to use defined IDs rather then raw integers --- conman.c | 10 +++++----- nread.c | 12 +++++++----- protocol.c | 43 ++++++++++++++++++++++++------------------- protocol.h | 8 ++++++++ 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/conman.c b/conman.c index c8bad638..ab685052 100644 --- a/conman.c +++ b/conman.c @@ -1086,7 +1086,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) return 0; } - if(self->iProtocolID == 5) { + if(self->iProtocolID == 4) { if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1119,7 +1119,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) WriteToCommandLogId(NULL, self->sockHandle, buffer); SetSendingConnection(NULL); - if(self->iProtocolID == 5) { + if(self->iProtocolID == 4) { if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1323,7 +1323,7 @@ int SCWriteZipped(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 4) { + if (protocolID == PROTACT) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName, compressedLength, cc.transID); @@ -1398,7 +1398,7 @@ int SCWriteBinary(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 4) { + if (protocolID == PROTACT) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN BIN %s %d %d\r\n", pName, iDataLen, cc.transID); @@ -1504,7 +1504,7 @@ int SCWriteZippedOld(SConnection * self, char *pName, void *pData, memset(outBuf, 0, 65536); protocolID = GetProtocolID(self); - if (protocolID == 4) { + if (protocolID == PROTACT) { cc = SCGetContext(self); sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName, compressedLength, cc.transID); diff --git a/nread.c b/nread.c index 3cb546e3..33b98f5a 100644 --- a/nread.c +++ b/nread.c @@ -42,6 +42,8 @@ #include "commandlog.h" #include "uselect.h" #include "trace.h" +#include "protocol.h" + extern pServer pServ; extern int VerifyChannel(mkChannel * self); /* defined in network.c */ @@ -296,7 +298,7 @@ static int NetReadRead(pNetRead self, pNetItem pItem) if (strlen(pItem->pHold) > 0) { strlcat(pItem->pHold, pPtr, 511); /* DFC locking for protocol zero only */ - if (pItem->pCon->iProtocolID == 0 && + if (pItem->pCon->iProtocolID == PROTSICS && CostaLocked(pItem->pCon->pStack)) iStat = 0; else @@ -308,7 +310,7 @@ static int NetReadRead(pNetRead self, pNetItem pItem) } else { /* no, normal command */ /* DFC locking for protocol zero only */ - if (pItem->pCon->iProtocolID == 0 && + if (pItem->pCon->iProtocolID == PROTSICS && CostaLocked(pItem->pCon->pStack)) iStat = 0; else @@ -498,7 +500,7 @@ static int TelnetRead(pNetRead self, pNetItem pItem) case '\r': case '\n': /* DFC locking for protocol zero only */ - if (pItem->pCon->iProtocolID == 0 && + if (pItem->pCon->iProtocolID == PROTSICS && CostaLocked(pItem->pCon->pStack)) iStat = 0; else @@ -1076,7 +1078,7 @@ static int CommandDataCB(int handle, void *userData) if (pPtr[i] == '\r' || pPtr[i] == '\n') { self->state = SKIPTERM; if (!testAndInvokeInterrupt(self, handle)) { - if (self->pCon->iProtocolID == 0 && CostaLocked(self->pCon->pStack)) + if (self->pCon->iProtocolID == PROTSICS && CostaLocked(self->pCon->pStack)) status = 0; else status = CostaTop(self->pCon->pStack, GetCharArray(self->command)); @@ -1180,7 +1182,7 @@ static int ANETTelnetProcess(int handle, void *usData) case '\r': case '\n': if (!testAndInvokeInterrupt(self, handle)) { - if (self->pCon->iProtocolID == 0 && CostaLocked(self->pCon->pStack)) + if (self->pCon->iProtocolID == PROTSICS && CostaLocked(self->pCon->pStack)) status = 0; else status = CostaTop(self->pCon->pStack, GetCharArray(self->command)); diff --git a/protocol.c b/protocol.c index b35bfcd1..be034a9d 100644 --- a/protocol.c +++ b/protocol.c @@ -32,6 +32,10 @@ typedef struct __Protocol { int isDefaultSet; char *pProList[PROLISTLEN]; /* list of valid protocols? */ } Protocol; +/*================================================================================================ + WARNING: These two char arrays may replicate things defined elsewhere. They may be out of + sync with the rest of SIS. Keep in mind..... + ==================================================================================================*/ char *pEventType[] = { "VALUECHANGE", /* 0 */ @@ -271,29 +275,29 @@ static int ProtocolSet(SConnection * pCon, Protocol * pPro, char *pProName) return 0; break; - case 1: /* normal (connection start default) */ + case PROTNORM: /* normal (connection start default) */ SCSetWriteFunc(pMaster, SCNormalWrite); SCSetWriteFunc(pCon, SCNormalWrite); break; - case 2: /* outcodes */ + case PROTCODE: /* outcodes */ SCSetWriteFunc(pMaster, SCWriteWithOutcode); SCSetWriteFunc(pCon, SCWriteWithOutcode); break; - case 3: /* json */ + case PROTJSON: /* json */ SCSetWriteFunc(pCon, SCWriteJSON_String); SCSetWriteFunc(pMaster, SCWriteJSON_String); break; - case 4: /* ACT */ + case PROTACT: /* ACT */ SCSetWriteFunc(pMaster, SCACTWrite); SCSetWriteFunc(pCon, SCACTWrite); break; - case 5: + case PROTALL: SCSetWriteFunc(pMaster, SCAllWrite); SCSetWriteFunc(pCon, SCAllWrite); break; - case 0: /* default = psi_sics */ + case PROTSICS: /* default = psi_sics */ default: SCSetWriteFunc(pMaster, pPro->defaultWriter); SCSetWriteFunc(pCon, pPro->defaultWriter); @@ -332,11 +336,11 @@ int ProtocolGet(SConnection * pCon, void *pData, char *pProName, int len) /* check list of protocols for valid name */ switch (Index) { - case 0: /* default = psi_sics */ - case 1: /* normal (connection start default) */ - case 2: /* outcodes */ - case 3: /* json */ - case 4: /* act */ + case PROTSICS: /* default = psi_sics */ + case PROTNORM: /* normal (connection start default) */ + case PROTCODE: /* outcodes */ + case PROTJSON: /* json */ + case PROTACT: /* act */ pProName = pPro->pProList[Index]; return 1; break; @@ -441,7 +445,7 @@ static int InitDefaultProtocol(SConnection * pCon, Protocol * pPro) if (0 == pPro->isDefaultSet) { pPro->defaultWriter = SCGetWriteFunc(pCon); pPro->isDefaultSet = 1; - pCon->iProtocolID = 0; + pCon->iProtocolID = PROTSICS; } return pPro->isDefaultSet; } @@ -628,10 +632,11 @@ char *GetProtocolName(SConnection * pCon) /* check list of protocols for valid name */ switch (pCon->iProtocolID) { - case 0: /* default = psi_sics */ - case 1: /* normal (connection start default) */ - case 2: /* outcodes */ - case 3: /* json */ + case PROTSICS: /* default = psi_sics */ + case PROTNORM: /* normal (connection start default) */ + case PROTCODE: /* outcodes */ + case PROTJSON: /* json */ + case PROTACT: /* act */ return strdup(pPro->pProList[pCon->iProtocolID]); break; default: @@ -654,13 +659,13 @@ writeFunc GetProtocolWriteFunc(SConnection * pCon) { if (pCon != NULL) { switch (pCon->iProtocolID) { - case 2: /* outcodes */ + case PROTCODE: /* outcodes */ return SCWriteWithOutcode; break; - case 3: /* json */ + case PROTJSON: /* json */ return SCWriteJSON_String; break; - case 4: + case PROTACT: return SCACTWrite; break; default: diff --git a/protocol.h b/protocol.h index 4151b4f7..67316967 100644 --- a/protocol.h +++ b/protocol.h @@ -15,6 +15,14 @@ static char *pProTags[3] = { #define esStart -1 #define esFinish -2 +/*---------------------- protocol defines -------------------------------*/ +#define PROTSICS 0 +#define PROTNORM 1 +#define PROTCODE 2 +#define PROTJSON 3 +#define PROTACT 4 +#define PROTALL 5 + /*--------------------- lifecycle -------------------------------------- */ int InstallProtocol(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]); From e73743a97c74596c34ecb282504460e976458bbf Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Mon, 11 May 2015 14:47:19 +0200 Subject: [PATCH 09/13] Another protocol ID which escaped --- conman.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conman.c b/conman.c index ab685052..963a761d 100644 --- a/conman.c +++ b/conman.c @@ -1086,7 +1086,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) return 0; } - if(self->iProtocolID == 4) { + if(self->iProtocolID == PROTACT) { if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); From 9517a6d14e827bea53bf02d8f43d4bac48e616fd Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Tue, 12 May 2015 08:36:38 +0200 Subject: [PATCH 10/13] Switched remoteobject to use a single connection with ACT --- conman.c | 4 +- remoteobject.c | 144 +++++++++++++++++-------------------------------- 2 files changed, 52 insertions(+), 96 deletions(-) diff --git a/conman.c b/conman.c index 86a0c163..8d99bb05 100644 --- a/conman.c +++ b/conman.c @@ -1098,7 +1098,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) return 0; } - if(self->iProtocolID == 4) { /* act */ + if(self->iProtocolID == PROTACT) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1110,7 +1110,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } - } else if(self->iProtocolID == 3) { + } else if(self->iProtocolID == PROTJSON) { myJson = mkJSON_Object(self,buffer,iOut); if(myJson != NULL){ SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); diff --git a/remoteobject.c b/remoteobject.c index 2eb19559..af72556e 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -36,6 +36,9 @@ #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ +#define READACT 7654 +#define POCHACT 8437 + static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); @@ -44,8 +47,7 @@ static int transactionID = 100000; typedef struct { char *host; int port; - int readHandle; - int writeHandle; + int handle; int transactHandle; int readList; int writeList; @@ -79,69 +81,13 @@ void KillRemoteOBJ(void *data) snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); StopTask(pServ->pTasker,roTaskName); free(self->host); - ANETclose(self->readHandle); - ANETclose(self->writeHandle); + ANETclose(self->handle); ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); LLDdeleteBlob(self->writeList); json_tokener_free(self->jtok); } } -/*========================= reading related code ================================*/ -static int RemoteReadCallback(int handle, void *userData) -{ - int length; - char *pPtr, *pStart, *pEnd; - - pPtr = ANETreadPtr(handle,&length); - - /* - * deal with command results - */ - pStart = strstr(pPtr, "TRANSACTIONSTART"); - pEnd = strstr(pPtr,"TRANSACTIONEND"); - if(pStart != NULL && pEnd != NULL){ - pStart = pStart + strlen("TRANSACTIONSTART"); - *pEnd = '\0'; - traceIO("RO","Received command - reply: %s", pStart); - pEnd += strlen("TRANSACTIONEND"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with update messages - */ - pStart = strstr(pPtr, "SROC:"); - pEnd = strstr(pPtr,":EROC\r\n"); - if(pStart != NULL && pEnd != NULL){ - pStart += strlen("SROC:"); - *pEnd = '\0'; - InterpExecute(pServ->pSics, pServ->dummyCon,pStart); - traceIO("RO", "Received %s from remote", pStart); - pEnd += strlen("EROC\r\n"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with heartbeats - */ - if((pStart = strstr(pPtr,"Poch")) != NULL){ - ANETreadConsume(handle,(pStart+4) - pPtr); - } - - /* - If there is more stuff to process: recurse - */ - pPtr = ANETreadPtr(handle,&length); - if(length > 0 && - ( strstr(pPtr,":EROC\r\n") != NULL || - strstr(pPtr,"TRANSACTIONEND") != NULL - || strstr(pPtr,"Poch") != NULL ) ) { - RemoteReadCallback(handle,userData); - } - - return 1; -} /*-----------------------------------------------------------------------------*/ static int transactCommand(int handle, char *command, char *reply, int replyLen) { @@ -209,10 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) return; } - self->writeHandle = ANETconnect(self->host, self->port); - self->readHandle = ANETconnect(self->host, self->port); + self->handle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ + if(self->handle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -225,26 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self) Default login with hard coded manager login. Defined in nserver.c */ - ANETwrite(self->readHandle,login,strlen(login)); - ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->handle,login,strlen(login)); ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* eat the login responses */ - pPtr = ANETreadPtr(self->readHandle, &length); - ANETreadConsume(self->readHandle,length); - pPtr = ANETreadPtr(self->writeHandle, &length); - ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->handle, &length); + ANETreadConsume(self->handle,length); pPtr = ANETreadPtr(self->transactHandle, &length); ANETreadConsume(self->transactHandle,length); + transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1); - /* - * install the read callback - */ - ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL); /* * Remove geterror on read nodes and reinstall callbacks for reconnects @@ -255,14 +194,13 @@ static void ConnectRemoteObject(pRemoteOBJ self) node = FindHdbNode(NULL,rd.localNode,NULL); if(node != NULL){ SetHdbProperty(node,"geterror",NULL); - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); } status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1); self->connected = 1; } @@ -291,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, pUpdateCallback uppi = (pUpdateCallback)userData; hdbDataMessage *mm = NULL; pDynString text; - char *prefix = {"SROC:hupdate "}; - char *postfix= {":EROC\r\n"}; + char *prefix = {"hupdate "}; + char *postfix= {" \r\n"}; char *txt = NULL; int length; pHdbPropertyChange propChange = NULL; @@ -330,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, if(!SCisConnected(uppi->sendCon)){ return hdbKill; } - length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + + length = strlen("hdelprop ") + strlen(uppi->remotePath) + strlen(propChange->key) + 10; if(propChange->value != NULL){ length += strlen(propChange->value); @@ -340,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, return hdbContinue; } if(propChange->value == NULL){ - snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, + snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath, propChange->key,postfix); } else { - snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, + snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath, propChange->key,propChange->value, postfix); } SCWrite(uppi->sendCon,txt,eValue); @@ -469,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Install a callback on the remote node to update the master. The remote should * then immediatly send an update which will be processed by the read callback. */ - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); return 1; } @@ -513,10 +451,10 @@ static int HeartbeatTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; int status; - char command[] = {"Poch\r\n"}; + char command[] = {"contextdo 8437 Poch\r\n"}; if (time(NULL) > self->nextHeartbeat){ - status = ANETwrite(self->readHandle,command, strlen(command)); + status = ANETwrite(self->handle,command, strlen(command)); if(status != 1){ traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); self->connected = 0; @@ -561,11 +499,11 @@ static int WriteResponseTask(void *pData) OutCode eOut; writeData WD; - if(!ANETvalidHandle(self->writeHandle)) { + if(!ANETvalidHandle(self->handle)) { return 1; } - pText = ANETreadPtr(self->writeHandle,&length); + pText = ANETreadPtr(self->handle,&length); while(length > 0){ json_tokener_reset(self->jtok); message = json_tokener_parse_ex(self->jtok,pText,length); @@ -575,19 +513,19 @@ static int WriteResponseTask(void *pData) } else if(tokerr != json_tokener_success) { traceIO("RO","JSON parsing error %s on %s from %s %d", json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); - ANETreadConsume(self->writeHandle,length); + ANETreadConsume(self->handle,length); return 1; } if(json_object_get_type(message) != json_type_object) { traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); - ANETreadConsume(self->writeHandle,length); + ANETreadConsume(self->handle,length); return 1; } /* we need to consume here what has been parsed. The char_offset in the tokenizer structure might tell us that... */ - ANETreadConsume(self->writeHandle,self->jtok->char_offset); + ANETreadConsume(self->handle,self->jtok->char_offset); /* @@ -616,6 +554,24 @@ static int WriteResponseTask(void *pData) pText = (char *)json_object_get_string(data); traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); + + /* + do nothing on Poch + */ + if(transID == POCHACT){ + pText = ANETreadPtr(self->handle,&length); + continue; + } + + /* + process update messages + */ + if(transID == READACT){ + InterpExecute(pServ->pSics,pServ->dummyCon,pText); + traceIO("RO","Received %s from remote",pText); + pText = ANETreadPtr(self->handle,&length); + continue; + } status = LLDnodePtr2First(self->writeList); while(status == 1){ @@ -646,7 +602,7 @@ static int WriteResponseTask(void *pData) } status = LLDnodePtr2Next(self->writeList); } - pText = ANETreadPtr(self->writeHandle,&length); + pText = ANETreadPtr(self->handle,&length); } return 1; @@ -701,7 +657,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); LLDblobAppend(self->writeList,&WD,sizeof(writeData)); - status = ANETwrite(self->writeHandle,command,strlen(command)); + status = ANETwrite(self->handle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ @@ -927,7 +883,7 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); - snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); + snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status; From c60e9920c5d1e36261fdbc4ac639f11fab3e4b61 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Wed, 20 May 2015 08:55:49 +0200 Subject: [PATCH 11/13] First working verion of remote object Fixed two bugs along the way: - asynnet: ANETReadPtr now sets length to 0 if ther eis no data - motorsec: if the motor does not start the status is set to error --- asynnet.c | 1 + motorsec.c | 2 +- remoteobject.c | 97 ++++++++++++++++++++++++++++++++------------------ 3 files changed, 65 insertions(+), 35 deletions(-) diff --git a/asynnet.c b/asynnet.c index 46592d9b..7b707e61 100644 --- a/asynnet.c +++ b/asynnet.c @@ -581,6 +581,7 @@ void *ANETreadPtr(int handle, int *length) con = findSocketDescriptor(handle); if (con == NULL) { + *length = 0; return NULL; } else { data = GetRWBufferData(con->readBuffer, length); diff --git a/motorsec.c b/motorsec.c index 0288e4bc..723326f7 100644 --- a/motorsec.c +++ b/motorsec.c @@ -527,7 +527,7 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData, SCSetInterrupt(pCon, eAbortBatch); self->pDrivInt->iErrorCount = 0; child = GetHipadabaNode(self->pDescriptor->parNode, "status"); - UpdateHipadabaPar(child, MakeHdbText("run"), pCon); + UpdateHipadabaPar(child, MakeHdbText("error"), pCon); return hdbAbort; } diff --git a/remoteobject.c b/remoteobject.c index af72556e..0e78a3a4 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -489,6 +489,44 @@ static OutCode findOutCode(char *txt) return eValue; } /*-----------------------------------------------------------------------------------*/ +static void CheckWriteList(int writeList,int transID, OutCode eOut, char *pText) +{ + int status; + writeData WD; + + status = LLDnodePtr2First(writeList); + while(status == 1){ + LLDblobData(writeList,&WD); + if(WD.transID == transID){ + if(strstr(pText,"COMSTART") != NULL){ + /* skip */ + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { + SCDeleteConnection(WD.pCon); + LLDblobDelete(writeList); + return; + } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { + /* skip */ + return; + } else if(strstr(pText,"TASKSTART") != NULL){ + WD.waitTask = 1 ; + LLDblobDelete(writeList); + LLDblobAppend(writeList,&WD, sizeof(writeData)); + return; + } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ + SCDeleteConnection(WD.pCon); + LLDblobDelete(writeList); + return; + } else { + if(strstr(pText,"OK") == NULL){ + SCWrite(WD.pCon,pText,eOut); + } + return; + } + } + status = LLDnodePtr2Next(writeList); + } + } +/*-----------------------------------------------------------------------------------*/ static int WriteResponseTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; @@ -560,6 +598,7 @@ static int WriteResponseTask(void *pData) */ if(transID == POCHACT){ pText = ANETreadPtr(self->handle,&length); + json_object_put(message); continue; } @@ -567,41 +606,21 @@ static int WriteResponseTask(void *pData) process update messages */ if(transID == READACT){ - InterpExecute(pServ->pSics,pServ->dummyCon,pText); + if(strstr(pText,"hupdate") != NULL || strstr(pText,"prop") != NULL){ + InterpExecute(pServ->pSics,pServ->dummyCon,pText); + } traceIO("RO","Received %s from remote",pText); pText = ANETreadPtr(self->handle,&length); + json_object_put(message); continue; } + + /* + check write List + */ + CheckWriteList(self->writeList,transID,eOut,pText); + json_object_put(message); - status = LLDnodePtr2First(self->writeList); - while(status == 1){ - LLDblobData(self->writeList,&WD); - if(WD.transID == transID){ - if(strstr(pText,"COMSTART") != NULL){ - /* skip */ - } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) { - SCDeleteConnection(WD.pCon); - LLDblobDelete(self->writeList); - return 1; - } else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) { - /* skip */ - return 1; - } else if(strstr(pText,"TASKSTART") != NULL){ - WD.waitTask = 1 ; - LLDblobDelete(self->writeList); - LLDblobAppend(self->writeList,&WD, sizeof(writeData)); - return 1; - } else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){ - SCDeleteConnection(WD.pCon); - LLDblobDelete(self->writeList); - return 1; - } else { - SCWrite(WD.pCon,pText,eOut); - return 1; - } - } - status = LLDnodePtr2Next(self->writeList); - } pText = ANETreadPtr(self->handle,&length); } @@ -661,10 +680,15 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, free(command); DeleteDynString(data); if(status < 0){ - if(pCon != NULL){ - SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host); + self->connected = 0; + ConnectRemoteObject(self); + if(self->connected == 0){ + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", + remoteNode, self->host); + } + return hdbAbort; } - return hdbAbort; } return hdbContinue; } @@ -694,6 +718,11 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) SetHdbProperty(localNode,"remotewrite",rd.remoteNode); AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback, self,NULL)); + /* + TODO: The connected write nodes should be held in a list in order to be able to + remove the write callbacks when deleting the remote object. As removing remote + objects usually only happens when SICS shuts down this is not so important. + */ /* * Get information about the remote node and check compatability @@ -764,7 +793,7 @@ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) */ memset(answer,0,sizeof(answer)-1); status = transactCommand(self->transactHandle,command,answer,sizeof(answer)); - if(status){ + if(status == 1){ SCWrite(pCon,answer,eValue); } else { SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); From 69d44bc4951baa7be22e084382924007dfe5c849 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Fri, 5 Jun 2015 08:28:03 +0200 Subject: [PATCH 12/13] Fixed bad -9999.9 after error message when geterror was set on a root level SICSOBJ node --- sicsobj.c | 1 - 1 file changed, 1 deletion(-) diff --git a/sicsobj.c b/sicsobj.c index 330e5933..8f08d396 100644 --- a/sicsobj.c +++ b/sicsobj.c @@ -477,7 +477,6 @@ int InvokeSICSOBJ(SConnection * pCon, SicsInterp * pSics, void *pData, status = GetHdbProperty(parNode,"geterror",buffer,sizeof(buffer)); if (status == 1 && strstr(buffer,"none") == NULL){ SCPrintf(pCon,eValue,"ERROR: %s on last read of %s", buffer, argv[0]); - SCPrintf(pCon,eValue,"%s = -99999", argv[0]); return 0; } status = GetHipadabaPar(parNode, &data, pCon); From 5ad786b3b4dc621be50da357b837892eb6a44533 Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Fri, 5 Jun 2015 08:29:49 +0200 Subject: [PATCH 13/13] Small adaptions to output for TRICS ccl files --- fourmess.c | 3 ++- sicsobj.c | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fourmess.c b/fourmess.c index c497e21e..ac72764a 100644 --- a/fourmess.c +++ b/fourmess.c @@ -411,6 +411,7 @@ static int FourMessStoreIntern(pSICSOBJ self, SConnection * pCon, SCWrite(pCon, "ERROR: store: no files open", eLogError); return 0; } + priv->count++; /* get necessary data */ fSum = 0.; @@ -497,7 +498,7 @@ static int FourMessStoreIntern(pSICSOBJ self, SConnection * pCon, fPreset, fTemp, prot, pBueffel); } else { fprintf(priv->profFile, "%3d %7.4f %9.0f %7.3f %12f %s %s\n", iNP, fStep, - fPreset, fTemp, prot, extra, pBueffel); + fPreset, fTemp, prot, pBueffel,extra); } for (i = 0; i < iNP; i++) { for (ii = 0; ii < 10 && i < iNP; ii++) { diff --git a/sicsobj.c b/sicsobj.c index 330e5933..8f08d396 100644 --- a/sicsobj.c +++ b/sicsobj.c @@ -477,7 +477,6 @@ int InvokeSICSOBJ(SConnection * pCon, SicsInterp * pSics, void *pData, status = GetHdbProperty(parNode,"geterror",buffer,sizeof(buffer)); if (status == 1 && strstr(buffer,"none") == NULL){ SCPrintf(pCon,eValue,"ERROR: %s on last read of %s", buffer, argv[0]); - SCPrintf(pCon,eValue,"%s = -99999", argv[0]); return 0; } status = GetHipadabaPar(parNode, &data, pCon);