Switched remoteobject to use a single connection with ACT

This commit is contained in:
2015-05-12 08:36:38 +02:00
parent 4865902a97
commit 9517a6d14e
2 changed files with 52 additions and 96 deletions

View File

@ -1098,7 +1098,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
return 0; return 0;
} }
if(self->iProtocolID == 4) { /* act */ if(self->iProtocolID == PROTACT) { /* act */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1110,7 +1110,7 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == 3) { } else if(self->iProtocolID == PROTJSON) {
myJson = mkJSON_Object(self,buffer,iOut); myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){ if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson)); SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));

View File

@ -36,6 +36,9 @@
#define OOM -5001 /* out of memory */ #define OOM -5001 /* out of memory */
#define TO -5002 /* timeout */ #define TO -5002 /* timeout */
#define READACT 7654
#define POCHACT 8437
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
extern char *trim(char *txt); extern char *trim(char *txt);
@ -44,8 +47,7 @@ static int transactionID = 100000;
typedef struct { typedef struct {
char *host; char *host;
int port; int port;
int readHandle; int handle;
int writeHandle;
int transactHandle; int transactHandle;
int readList; int readList;
int writeList; int writeList;
@ -79,69 +81,13 @@ void KillRemoteOBJ(void *data)
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
StopTask(pServ->pTasker,roTaskName); StopTask(pServ->pTasker,roTaskName);
free(self->host); free(self->host);
ANETclose(self->readHandle); ANETclose(self->handle);
ANETclose(self->writeHandle);
ANETclose(self->transactHandle); ANETclose(self->transactHandle);
LLDdeleteBlob(self->readList); LLDdeleteBlob(self->readList);
LLDdeleteBlob(self->writeList); LLDdeleteBlob(self->writeList);
json_tokener_free(self->jtok); json_tokener_free(self->jtok);
} }
} }
/*========================= reading related code ================================*/
static int RemoteReadCallback(int handle, void *userData)
{
int length;
char *pPtr, *pStart, *pEnd;
pPtr = ANETreadPtr(handle,&length);
/*
* deal with command results
*/
pStart = strstr(pPtr, "TRANSACTIONSTART");
pEnd = strstr(pPtr,"TRANSACTIONEND");
if(pStart != NULL && pEnd != NULL){
pStart = pStart + strlen("TRANSACTIONSTART");
*pEnd = '\0';
traceIO("RO","Received command - reply: %s", pStart);
pEnd += strlen("TRANSACTIONEND");
ANETreadConsume(handle,pEnd - pPtr);
}
/*
* deal with update messages
*/
pStart = strstr(pPtr, "SROC:");
pEnd = strstr(pPtr,":EROC\r\n");
if(pStart != NULL && pEnd != NULL){
pStart += strlen("SROC:");
*pEnd = '\0';
InterpExecute(pServ->pSics, pServ->dummyCon,pStart);
traceIO("RO", "Received %s from remote", pStart);
pEnd += strlen("EROC\r\n");
ANETreadConsume(handle,pEnd - pPtr);
}
/*
* deal with heartbeats
*/
if((pStart = strstr(pPtr,"Poch")) != NULL){
ANETreadConsume(handle,(pStart+4) - pPtr);
}
/*
If there is more stuff to process: recurse
*/
pPtr = ANETreadPtr(handle,&length);
if(length > 0 &&
( strstr(pPtr,":EROC\r\n") != NULL ||
strstr(pPtr,"TRANSACTIONEND") != NULL
|| strstr(pPtr,"Poch") != NULL ) ) {
RemoteReadCallback(handle,userData);
}
return 1;
}
/*-----------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------*/
static int transactCommand(int handle, char *command, char *reply, int replyLen) static int transactCommand(int handle, char *command, char *reply, int replyLen)
{ {
@ -209,10 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self)
return; return;
} }
self->writeHandle = ANETconnect(self->host, self->port); self->handle = ANETconnect(self->host, self->port);
self->readHandle = ANETconnect(self->host, self->port);
self->transactHandle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port);
if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ if(self->handle < 0 || self->transactHandle < 0){
self->connected = 0; self->connected = 0;
traceIO("RO","Failed to connect to remote objects at %s, port %d", traceIO("RO","Failed to connect to remote objects at %s, port %d",
self->host, self->port); self->host, self->port);
@ -225,26 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self)
Default login with hard coded manager login. Defined in Default login with hard coded manager login. Defined in
nserver.c nserver.c
*/ */
ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->handle,login,strlen(login));
ANETwrite(self->writeHandle,login,strlen(login));
ANETwrite(self->transactHandle,login,strlen(login)); ANETwrite(self->transactHandle,login,strlen(login));
usleep(500); usleep(500);
ANETprocess(); ANETprocess();
/* /*
eat the login responses eat the login responses
*/ */
pPtr = ANETreadPtr(self->readHandle, &length); pPtr = ANETreadPtr(self->handle, &length);
ANETreadConsume(self->readHandle,length); ANETreadConsume(self->handle,length);
pPtr = ANETreadPtr(self->writeHandle, &length);
ANETreadConsume(self->writeHandle,length);
pPtr = ANETreadPtr(self->transactHandle, &length); pPtr = ANETreadPtr(self->transactHandle, &length);
ANETreadConsume(self->transactHandle,length); ANETreadConsume(self->transactHandle,length);
transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1);
/*
* install the read callback
*/
ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL);
/* /*
* Remove geterror on read nodes and reinstall callbacks for reconnects * Remove geterror on read nodes and reinstall callbacks for reconnects
@ -255,14 +194,13 @@ static void ConnectRemoteObject(pRemoteOBJ self)
node = FindHdbNode(NULL,rd.localNode,NULL); node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){ if(node != NULL){
SetHdbProperty(node,"geterror",NULL); SetHdbProperty(node,"geterror",NULL);
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
rd.remoteNode, rd.localNode); READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->readHandle,command,strlen(command)); ANETwrite(self->handle,command,strlen(command));
} }
status = LLDnodePtr2Next(self->readList); status = LLDnodePtr2Next(self->readList);
} }
transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1);
self->connected = 1; self->connected = 1;
} }
@ -291,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
pUpdateCallback uppi = (pUpdateCallback)userData; pUpdateCallback uppi = (pUpdateCallback)userData;
hdbDataMessage *mm = NULL; hdbDataMessage *mm = NULL;
pDynString text; pDynString text;
char *prefix = {"SROC:hupdate "}; char *prefix = {"hupdate "};
char *postfix= {":EROC\r\n"}; char *postfix= {" \r\n"};
char *txt = NULL; char *txt = NULL;
int length; int length;
pHdbPropertyChange propChange = NULL; pHdbPropertyChange propChange = NULL;
@ -330,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
if(!SCisConnected(uppi->sendCon)){ if(!SCisConnected(uppi->sendCon)){
return hdbKill; return hdbKill;
} }
length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + length = strlen("hdelprop ") + strlen(uppi->remotePath) +
strlen(propChange->key) + 10; strlen(propChange->key) + 10;
if(propChange->value != NULL){ if(propChange->value != NULL){
length += strlen(propChange->value); length += strlen(propChange->value);
@ -340,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
return hdbContinue; return hdbContinue;
} }
if(propChange->value == NULL){ if(propChange->value == NULL){
snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath,
propChange->key,postfix); propChange->key,postfix);
} else { } else {
snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath,
propChange->key,propChange->value, postfix); propChange->key,propChange->value, postfix);
} }
SCWrite(uppi->sendCon,txt,eValue); SCWrite(uppi->sendCon,txt,eValue);
@ -469,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
* Install a callback on the remote node to update the master. The remote should * Install a callback on the remote node to update the master. The remote should
* then immediatly send an update which will be processed by the read callback. * then immediatly send an update which will be processed by the read callback.
*/ */
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
rd.remoteNode, rd.localNode); READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->readHandle,command,strlen(command)); ANETwrite(self->handle,command,strlen(command));
return 1; return 1;
} }
@ -513,10 +451,10 @@ static int HeartbeatTask(void *pData)
{ {
pRemoteOBJ self = (pRemoteOBJ)pData; pRemoteOBJ self = (pRemoteOBJ)pData;
int status; int status;
char command[] = {"Poch\r\n"}; char command[] = {"contextdo 8437 Poch\r\n"};
if (time(NULL) > self->nextHeartbeat){ if (time(NULL) > self->nextHeartbeat){
status = ANETwrite(self->readHandle,command, strlen(command)); status = ANETwrite(self->handle,command, strlen(command));
if(status != 1){ if(status != 1){
traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port);
self->connected = 0; self->connected = 0;
@ -561,11 +499,11 @@ static int WriteResponseTask(void *pData)
OutCode eOut; OutCode eOut;
writeData WD; writeData WD;
if(!ANETvalidHandle(self->writeHandle)) { if(!ANETvalidHandle(self->handle)) {
return 1; return 1;
} }
pText = ANETreadPtr(self->writeHandle,&length); pText = ANETreadPtr(self->handle,&length);
while(length > 0){ while(length > 0){
json_tokener_reset(self->jtok); json_tokener_reset(self->jtok);
message = json_tokener_parse_ex(self->jtok,pText,length); message = json_tokener_parse_ex(self->jtok,pText,length);
@ -575,19 +513,19 @@ static int WriteResponseTask(void *pData)
} else if(tokerr != json_tokener_success) { } else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s %d", traceIO("RO","JSON parsing error %s on %s from %s %d",
json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset); json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset);
ANETreadConsume(self->writeHandle,length); ANETreadConsume(self->handle,length);
return 1; return 1;
} }
if(json_object_get_type(message) != json_type_object) { if(json_object_get_type(message) != json_type_object) {
traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host); traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host);
ANETreadConsume(self->writeHandle,length); ANETreadConsume(self->handle,length);
return 1; return 1;
} }
/* /*
we need to consume here what has been parsed. we need to consume here what has been parsed.
The char_offset in the tokenizer structure might tell us that... The char_offset in the tokenizer structure might tell us that...
*/ */
ANETreadConsume(self->writeHandle,self->jtok->char_offset); ANETreadConsume(self->handle,self->jtok->char_offset);
/* /*
@ -616,6 +554,24 @@ static int WriteResponseTask(void *pData)
pText = (char *)json_object_get_string(data); pText = (char *)json_object_get_string(data);
traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText); traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText);
/*
do nothing on Poch
*/
if(transID == POCHACT){
pText = ANETreadPtr(self->handle,&length);
continue;
}
/*
process update messages
*/
if(transID == READACT){
InterpExecute(pServ->pSics,pServ->dummyCon,pText);
traceIO("RO","Received %s from remote",pText);
pText = ANETreadPtr(self->handle,&length);
continue;
}
status = LLDnodePtr2First(self->writeList); status = LLDnodePtr2First(self->writeList);
while(status == 1){ while(status == 1){
@ -646,7 +602,7 @@ static int WriteResponseTask(void *pData)
} }
status = LLDnodePtr2Next(self->writeList); status = LLDnodePtr2Next(self->writeList);
} }
pText = ANETreadPtr(self->writeHandle,&length); pText = ANETreadPtr(self->handle,&length);
} }
return 1; return 1;
@ -701,7 +657,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
*/ */
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
LLDblobAppend(self->writeList,&WD,sizeof(writeData)); LLDblobAppend(self->writeList,&WD,sizeof(writeData));
status = ANETwrite(self->writeHandle,command,strlen(command)); status = ANETwrite(self->handle,command,strlen(command));
free(command); free(command);
DeleteDynString(data); DeleteDynString(data);
if(status < 0){ if(status < 0){
@ -927,7 +883,7 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port); snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1); TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1);
return status; return status;