From 9517a6d14e827bea53bf02d8f43d4bac48e616fd Mon Sep 17 00:00:00 2001 From: Koennecke Mark Date: Tue, 12 May 2015 08:36:38 +0200 Subject: [PATCH] Switched remoteobject to use a single connection with ACT --- conman.c | 4 +- remoteobject.c | 144 +++++++++++++++++-------------------------------- 2 files changed, 52 insertions(+), 96 deletions(-) diff --git a/conman.c b/conman.c index 86a0c163..8d99bb05 100644 --- a/conman.c +++ b/conman.c @@ -1098,7 +1098,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) return 0; } - if(self->iProtocolID == 4) { /* act */ + if(self->iProtocolID == PROTACT) { /* act */ if (strlen(buffer) + 30 > 1024) { pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); memset(pPtr, 0, strlen(buffer) + 20); @@ -1110,7 +1110,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } - } else if(self->iProtocolID == 3) { + } else if(self->iProtocolID == PROTJSON) { myJson = mkJSON_Object(self,buffer,iOut); if(myJson != NULL){ SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); diff --git a/remoteobject.c b/remoteobject.c index 2eb19559..af72556e 100644 --- a/remoteobject.c +++ b/remoteobject.c @@ -36,6 +36,9 @@ #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ +#define READACT 7654 +#define POCHACT 8437 + static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); @@ -44,8 +47,7 @@ static int transactionID = 100000; typedef struct { char *host; int port; - int readHandle; - int writeHandle; + int handle; int transactHandle; int readList; int writeList; @@ -79,69 +81,13 @@ void KillRemoteOBJ(void *data) snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); StopTask(pServ->pTasker,roTaskName); free(self->host); - ANETclose(self->readHandle); - ANETclose(self->writeHandle); + ANETclose(self->handle); ANETclose(self->transactHandle); LLDdeleteBlob(self->readList); LLDdeleteBlob(self->writeList); json_tokener_free(self->jtok); } } -/*========================= reading related code ================================*/ -static int RemoteReadCallback(int handle, void *userData) -{ - int length; - char *pPtr, *pStart, *pEnd; - - pPtr = ANETreadPtr(handle,&length); - - /* - * deal with command results - */ - pStart = strstr(pPtr, "TRANSACTIONSTART"); - pEnd = strstr(pPtr,"TRANSACTIONEND"); - if(pStart != NULL && pEnd != NULL){ - pStart = pStart + strlen("TRANSACTIONSTART"); - *pEnd = '\0'; - traceIO("RO","Received command - reply: %s", pStart); - pEnd += strlen("TRANSACTIONEND"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with update messages - */ - pStart = strstr(pPtr, "SROC:"); - pEnd = strstr(pPtr,":EROC\r\n"); - if(pStart != NULL && pEnd != NULL){ - pStart += strlen("SROC:"); - *pEnd = '\0'; - InterpExecute(pServ->pSics, pServ->dummyCon,pStart); - traceIO("RO", "Received %s from remote", pStart); - pEnd += strlen("EROC\r\n"); - ANETreadConsume(handle,pEnd - pPtr); - } - - /* - * deal with heartbeats - */ - if((pStart = strstr(pPtr,"Poch")) != NULL){ - ANETreadConsume(handle,(pStart+4) - pPtr); - } - - /* - If there is more stuff to process: recurse - */ - pPtr = ANETreadPtr(handle,&length); - if(length > 0 && - ( strstr(pPtr,":EROC\r\n") != NULL || - strstr(pPtr,"TRANSACTIONEND") != NULL - || strstr(pPtr,"Poch") != NULL ) ) { - RemoteReadCallback(handle,userData); - } - - return 1; -} /*-----------------------------------------------------------------------------*/ static int transactCommand(int handle, char *command, char *reply, int replyLen) { @@ -209,10 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self) return; } - self->writeHandle = ANETconnect(self->host, self->port); - self->readHandle = ANETconnect(self->host, self->port); + self->handle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port); - if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ + if(self->handle < 0 || self->transactHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); @@ -225,26 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self) Default login with hard coded manager login. Defined in nserver.c */ - ANETwrite(self->readHandle,login,strlen(login)); - ANETwrite(self->writeHandle,login,strlen(login)); + ANETwrite(self->handle,login,strlen(login)); ANETwrite(self->transactHandle,login,strlen(login)); usleep(500); ANETprocess(); /* eat the login responses */ - pPtr = ANETreadPtr(self->readHandle, &length); - ANETreadConsume(self->readHandle,length); - pPtr = ANETreadPtr(self->writeHandle, &length); - ANETreadConsume(self->writeHandle,length); + pPtr = ANETreadPtr(self->handle, &length); + ANETreadConsume(self->handle,length); pPtr = ANETreadPtr(self->transactHandle, &length); ANETreadConsume(self->transactHandle,length); + transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1); - /* - * install the read callback - */ - ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL); /* * Remove geterror on read nodes and reinstall callbacks for reconnects @@ -255,14 +194,13 @@ static void ConnectRemoteObject(pRemoteOBJ self) node = FindHdbNode(NULL,rd.localNode,NULL); if(node != NULL){ SetHdbProperty(node,"geterror",NULL); - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); } status = LLDnodePtr2Next(self->readList); } - transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1); self->connected = 1; } @@ -291,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, pUpdateCallback uppi = (pUpdateCallback)userData; hdbDataMessage *mm = NULL; pDynString text; - char *prefix = {"SROC:hupdate "}; - char *postfix= {":EROC\r\n"}; + char *prefix = {"hupdate "}; + char *postfix= {" \r\n"}; char *txt = NULL; int length; pHdbPropertyChange propChange = NULL; @@ -330,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, if(!SCisConnected(uppi->sendCon)){ return hdbKill; } - length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + + length = strlen("hdelprop ") + strlen(uppi->remotePath) + strlen(propChange->key) + 10; if(propChange->value != NULL){ length += strlen(propChange->value); @@ -340,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, return hdbContinue; } if(propChange->value == NULL){ - snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, + snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath, propChange->key,postfix); } else { - snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, + snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath, propChange->key,propChange->value, postfix); } SCWrite(uppi->sendCon,txt,eValue); @@ -469,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) * Install a callback on the remote node to update the master. The remote should * then immediatly send an update which will be processed by the read callback. */ - snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", - rd.remoteNode, rd.localNode); - ANETwrite(self->readHandle,command,strlen(command)); + snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n", + READACT, rd.remoteNode, rd.localNode); + ANETwrite(self->handle,command,strlen(command)); return 1; } @@ -513,10 +451,10 @@ static int HeartbeatTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; int status; - char command[] = {"Poch\r\n"}; + char command[] = {"contextdo 8437 Poch\r\n"}; if (time(NULL) > self->nextHeartbeat){ - status = ANETwrite(self->readHandle,command, strlen(command)); + status = ANETwrite(self->handle,command, strlen(command)); if(status != 1){ traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); self->connected = 0; @@ -561,11 +499,11 @@ static int WriteResponseTask(void *pData) OutCode eOut; writeData WD; - if(!ANETvalidHandle(self->writeHandle)) { + if(!ANETvalidHandle(self->handle)) { return 1; } - pText = ANETreadPtr(self->writeHandle,&length); + pText = ANETreadPtr(self->handle,&length); while(length > 0){ json_tokener_reset(self->jtok); message = json_tokener_parse_ex(self->jtok,pText,length); @@ -575,19 +513,19 @@ static int WriteResponseTask(void *pData) } else if(tokerr != json_tokener_success) { traceIO("RO","JSON parsing error %s on %s from %s %d", json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); - ANETreadConsume(self->writeHandle,length); + ANETreadConsume(self->handle,length); return 1; } if(json_object_get_type(message) != json_type_object) { traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); - ANETreadConsume(self->writeHandle,length); + ANETreadConsume(self->handle,length); return 1; } /* we need to consume here what has been parsed. The char_offset in the tokenizer structure might tell us that... */ - ANETreadConsume(self->writeHandle,self->jtok->char_offset); + ANETreadConsume(self->handle,self->jtok->char_offset); /* @@ -616,6 +554,24 @@ static int WriteResponseTask(void *pData) pText = (char *)json_object_get_string(data); traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); + + /* + do nothing on Poch + */ + if(transID == POCHACT){ + pText = ANETreadPtr(self->handle,&length); + continue; + } + + /* + process update messages + */ + if(transID == READACT){ + InterpExecute(pServ->pSics,pServ->dummyCon,pText); + traceIO("RO","Received %s from remote",pText); + pText = ANETreadPtr(self->handle,&length); + continue; + } status = LLDnodePtr2First(self->writeList); while(status == 1){ @@ -646,7 +602,7 @@ static int WriteResponseTask(void *pData) } status = LLDnodePtr2Next(self->writeList); } - pText = ANETreadPtr(self->writeHandle,&length); + pText = ANETreadPtr(self->handle,&length); } return 1; @@ -701,7 +657,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); LLDblobAppend(self->writeList,&WD,sizeof(writeData)); - status = ANETwrite(self->writeHandle,command,strlen(command)); + status = ANETwrite(self->handle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ @@ -927,7 +883,7 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); - snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); + snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); return status;