- Rewrote the write part of remoteobject

- With transactionID > 10^6, now termination messages are sent by contextdo
  and tasks
- This still has bugs
This commit is contained in:
2015-04-23 15:52:05 +02:00
parent b8fbe22fc0
commit b7eaa538ed
4 changed files with 167 additions and 199 deletions

View File

@ -1744,7 +1744,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand)
memset(pBueffel, 0, 80); memset(pBueffel, 0, 80);
stptok(trim(pCommand), pBueffel, 79, " "); stptok(trim(pCommand), pBueffel, 79, " ");
self->iCmdCtr++; self->iCmdCtr++;
if (999999 < self->iCmdCtr) { if (self->iCmdCtr > 99998) {
self->iCmdCtr = 0; self->iCmdCtr = 0;
} }
self->transID = self->iCmdCtr; self->transID = self->iCmdCtr;

View File

@ -241,10 +241,10 @@ static int DriveTaskFunc(void *data)
} else { } else {
ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); ExeInterest(pServ->pExecutor,taskData->name, "finished with problem");
} }
if(taskData->pCon->remote){
SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name);
}
traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); traceSys("drive","DriveTask %s finished with state %d", taskData->name,status);
if(taskData->pCon->transID > 100000) {
SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID);
}
return 0; return 0;
} }
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
@ -275,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget)
ExeInterest(pServ->pExecutor,name,"started"); ExeInterest(pServ->pExecutor,name,"started");
DevexecLog("START",name); DevexecLog("START",name);
InvokeNewTarget(pServ->pExecutor,name,fTarget); InvokeNewTarget(pServ->pExecutor,name,fTarget);
if(pCon->transID > 100000) {
SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID);
}
taskData->id = DRIVEID; taskData->id = DRIVEID;
taskData->obj = obj; taskData->obj = obj;
@ -396,8 +399,8 @@ static int CountTaskFunc(void *data)
ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); ExeInterest(pServ->pExecutor,taskData->name, "finished with problem");
} }
traceSys("count","CountTask %s finished with state %d", taskData->name,status); traceSys("count","CountTask %s finished with state %d", taskData->name,status);
if(taskData->pCon->remote){ if(taskData->pCon->transID > 100000) {
SCPrintf(taskData->pCon,eValue,"TASKFINISHED:%s", taskData->name); SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID);
} }
return 0; return 0;
} }
@ -424,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name)
} }
ExeInterest(pServ->pExecutor,name,"started"); ExeInterest(pServ->pExecutor,name,"started");
DevexecLog("START",name); DevexecLog("START",name);
if(pCon->transID > 100000) {
SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID);
}
taskData->id = COUNTID; taskData->id = COUNTID;
taskData->obj = obj; taskData->obj = obj;

View File

@ -188,7 +188,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData,
SCWrite(pCon, "ERROR: no more memory", eError); SCWrite(pCon, "ERROR: no more memory", eError);
return 0; return 0;
} }
if(comCon->transID > 100000) {
SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID);
}
status = InterpExecute(pSics, comCon, command); status = InterpExecute(pSics, comCon, command);
if(comCon->transID > 100000) {
SCPrintf(comCon,eLog,"COMEND %d", comCon->transID);
}
if (command != buffer) if (command != buffer)
free(command); free(command);
SCDeleteConnection(comCon); SCDeleteConnection(comCon);

View File

@ -2,7 +2,7 @@
* Remote objects in sicsobj. This means accessing remote objects in a different * Remote objects in sicsobj. This means accessing remote objects in a different
* SICS server from a master SICS server. * SICS server from a master SICS server.
* *
* Reading is implementd according to this scheme: * Reading is implemented according to this scheme:
* *
* * When a read connection is made between a local node and a remote node in slave, then a * * When a read connection is made between a local node and a remote node in slave, then a
* callback is installed on remote node in slave. * callback is installed on remote node in slave.
@ -31,22 +31,27 @@
#include <lld_blob.h> #include <lld_blob.h>
#include <dynstring.h> #include <dynstring.h>
#include <stptok.h> #include <stptok.h>
#include <json/json.h>
#define OOM -5001 /* out of memory */ #define OOM -5001 /* out of memory */
#define TO -5002 /* timeout */ #define TO -5002 /* timeout */
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
extern char *trim(char *txt); extern char *trim(char *txt);
static int transactionID = 100000;
/*---------------------- our very private data structure -------------------*/ /*---------------------- our very private data structure -------------------*/
typedef struct { typedef struct {
char *host; char *host;
int port; int port;
int readHandle; int readHandle;
int writeHandle; int writeHandle;
int writeInUse; int transactHandle;
int readList; int readList;
int writeList;
unsigned int connected; unsigned int connected;
time_t nextHeartbeat; time_t nextHeartbeat;
struct json_tokener *jtok;
} RemoteOBJ, *pRemoteOBJ; } RemoteOBJ, *pRemoteOBJ;
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
typedef struct { typedef struct {
@ -59,6 +64,12 @@ typedef struct {
char *remotePath; char *remotePath;
} UpdateCallback, *pUpdateCallback; } UpdateCallback, *pUpdateCallback;
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
typedef struct {
int transID;
SConnection *pCon;
int waitTask;
}writeData, *pWriteData;
/*----------------------------------------------------------------------------*/
void KillRemoteOBJ(void *data) void KillRemoteOBJ(void *data)
{ {
char roTaskName[132]; char roTaskName[132];
@ -70,7 +81,10 @@ void KillRemoteOBJ(void *data)
free(self->host); free(self->host);
ANETclose(self->readHandle); ANETclose(self->readHandle);
ANETclose(self->writeHandle); ANETclose(self->writeHandle);
ANETclose(self->transactHandle);
LLDdeleteBlob(self->readList); LLDdeleteBlob(self->readList);
LLDdeleteBlob(self->writeList);
json_tokener_free(self->jtok);
} }
} }
/*========================= reading related code ================================*/ /*========================= reading related code ================================*/
@ -135,7 +149,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
char *prefix = {"transact "}; char *prefix = {"transact "};
int status, length, type; int status, length, type;
time_t start; time_t start;
char *pPtr; char *pPtr, *pEnd;
/* /*
* read possible dirt of the line * read possible dirt of the line
@ -144,12 +158,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
ANETreadConsume(handle,length); ANETreadConsume(handle,length);
toSend = malloc(strlen(command) + strlen(prefix) + 1); toSend = malloc(strlen(command) + strlen(prefix) + 10);
if(toSend == NULL){ if(toSend == NULL){
return OOM; return OOM;
} }
strcpy(toSend, prefix); strcpy(toSend, prefix);
strcat(toSend, command); strcat(toSend, command);
if(strstr(command,"\n") == NULL){
strcat(toSend,"\r\n");
}
status = ANETwrite(handle,toSend,strlen(toSend)); status = ANETwrite(handle,toSend,strlen(toSend));
free(toSend); free(toSend);
if(status != 1){ if(status != 1){
@ -163,7 +180,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
while(time(NULL) < start + 2.0){ while(time(NULL) < start + 2.0){
ANETprocess(); ANETprocess();
pPtr = ANETreadPtr(handle,&length); pPtr = ANETreadPtr(handle,&length);
if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){
*pEnd = '\0';
strncpy(reply,pPtr,replyLen); strncpy(reply,pPtr,replyLen);
ANETreadConsume(handle,length); ANETreadConsume(handle,length);
return 1; return 1;
@ -193,7 +211,8 @@ static void ConnectRemoteObject(pRemoteOBJ self)
self->readHandle = ANETconnect(self->host, self->port); self->readHandle = ANETconnect(self->host, self->port);
self->writeHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port);
if(self->readHandle < 0 || self->writeHandle < 0){ self->transactHandle = ANETconnect(self->host, self->port);
if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){
self->connected = 0; self->connected = 0;
traceIO("RO","Failed to connect to remote objects at %s, port %d", traceIO("RO","Failed to connect to remote objects at %s, port %d",
self->host, self->port); self->host, self->port);
@ -208,6 +227,7 @@ static void ConnectRemoteObject(pRemoteOBJ self)
*/ */
ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->readHandle,login,strlen(login));
ANETwrite(self->writeHandle,login,strlen(login)); ANETwrite(self->writeHandle,login,strlen(login));
ANETwrite(self->transactHandle,login,strlen(login));
usleep(500); usleep(500);
ANETprocess(); ANETprocess();
/* /*
@ -217,6 +237,8 @@ static void ConnectRemoteObject(pRemoteOBJ self)
ANETreadConsume(self->readHandle,length); ANETreadConsume(self->readHandle,length);
pPtr = ANETreadPtr(self->writeHandle, &length); pPtr = ANETreadPtr(self->writeHandle, &length);
ANETreadConsume(self->writeHandle,length); ANETreadConsume(self->writeHandle,length);
pPtr = ANETreadPtr(self->transactHandle, &length);
ANETreadConsume(self->transactHandle,length);
/* /*
@ -240,11 +262,9 @@ static void ConnectRemoteObject(pRemoteOBJ self)
status = LLDnodePtr2Next(self->readList); status = LLDnodePtr2Next(self->readList);
} }
transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)-1); transactCommand(self->writeHandle,"protocol set json\r\n", command,sizeof(command)-1);
transactCommand(self->writeHandle,"config remote\r\n",command,sizeof(command)-1);
self->connected = 1; self->connected = 1;
self->writeInUse = 0;
} }
/*-----------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------*/
static void MarkDisconnected(pRemoteOBJ self) static void MarkDisconnected(pRemoteOBJ self)
@ -398,7 +418,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
* Get information about the remote node and check compatability * Get information about the remote node and check compatability
*/ */
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){ if(status != 1){
/* /*
* try a reconnect, * try a reconnect,
@ -408,7 +428,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
*/ */
self->connected = 0; self->connected = 0;
ConnectRemoteObject(self); ConnectRemoteObject(self);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){ if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host); self->host);
@ -510,11 +530,13 @@ static int HeartbeatTask(void *pData)
return 1; return 1;
} }
/*============================= writing related code =========================== /*============================= writing related code ===========================
The logic here is to use the standard writeHandle when available. I expect most This works by sending the command via contextdo with a ID > 10^6. This causes
communication to be short and to happen through the writeHandle. If that one is the remote SICS to send the termination messages. The transaction IDs together
in use, a new connection will be built. with the connection responsible for it are kept in a list.
---------------------------------------------------------------------------------
suppress all superfluous OK from the slave This list is used by the write task to forward messages properly and for handling
termination.
-----------------------------------------------------------------------------------*/ -----------------------------------------------------------------------------------*/
#include <outcode.c> #include <outcode.c>
static OutCode findOutCode(char *txt) static OutCode findOutCode(char *txt)
@ -528,94 +550,111 @@ static OutCode findOutCode(char *txt)
} }
return eValue; return eValue;
} }
/*--------------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------------*/
static void printSICS(char *answer, SConnection *pCon) static int WriteResponseTask(void *pData)
{ {
char line[1024], *pPtr, *pCode; pRemoteOBJ self = (pRemoteOBJ)pData;
OutCode eCode; int status, length = 0, transID;
char *pText, *outTxt;
json_object *message = NULL, *data = NULL;
enum json_tokener_error tokerr;
OutCode eOut;
writeData WD;
pPtr = answer; if(!ANETvalidHandle(self->writeHandle)) {
while(pPtr != NULL){ return 1;
memset(line,0,sizeof(line));
pPtr = stptok(pPtr,line,sizeof(line),"\n");
if(strstr(line,"OK") == NULL && strstr(line,"TASKFINISHED") == NULL){
pCode = strstr(line,"@@");
if(pCode != NULL){
*pCode = '\0';
pCode += 2;
eCode = findOutCode(trim(pCode));
} else {
eCode = eValue;
}
SCWrite(pCon,line,eCode);
}
} }
}
/*---------------------------------------------------------------------------------*/
static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle)
{
int handle, length;
char *answer = NULL;
char command[80];
if(self->writeInUse) { pText = ANETreadPtr(self->writeHandle,&length);
handle = ANETconnect(self->host,self->port); if(length > 0){
if(handle < 0){ json_tokener_reset(self->jtok);
traceIO("RO","Failed to connect to %s at %d", self->host, self->port); message = json_tokener_parse_ex(self->jtok,pText,length);
if(pCon != NULL){ tokerr = self->jtok->err;
SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); if(tokerr == json_tokener_continue){
} return 1;
return handle; } else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s",
json_tokener_errors[tokerr], pText, self->host);
ANETreadConsume(self->writeHandle,length);
return 1;
}
if(json_object_get_type(message) != json_type_object) {
traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host);
ANETreadConsume(self->writeHandle,length);
return 1;
} }
ANETwrite(handle,login,strlen(login));
usleep(500);
ANETprocess();
/* /*
eat the login responses we need to consume here what has been parsed.
The char_offset in the tokenizer structure might tell us that...
*/ */
answer = ANETreadPtr(handle, &length); ANETreadConsume(self->writeHandle,self->jtok->char_offset);
ANETreadConsume(handle,length);
*newHandle = 1;
transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command));
transactCommand(handle,"config remote\r\n",command,sizeof(command)-1);
} else {
self->writeInUse = 1;
handle = self->writeHandle;
/* /*
eat dirt from the line Received a valid message, process
*/ */
answer = ANETreadPtr(handle, &length); data = json_object_object_get(message,"trans");
ANETreadConsume(handle,length); if(data == NULL){
traceIO("RO","No transaction ID found in %s from %s", pText,self->host);
return 1;
}
transID = json_object_get_int(data);
data = json_object_object_get(message,"flag");
if(data == NULL){
traceIO("RO","No flag found in %s from %s", pText,self->host);
return 1;
}
outTxt = (char *)json_object_get_string(data);
eOut = findOutCode(outTxt);
data = json_object_object_get(message,"data");
if(data == NULL){
traceIO("RO","No data found in %s from %s", pText,self->host);
return 1;
}
pText = (char *)json_object_get_string(data);
status = LLDnodePtr2First(self->writeList);
while(status == 1){
LLDblobData(self->writeList,&WD);
if(WD.transID == transID){
if(strstr(pText,"COMSTART") != NULL){
/* skip */
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) {
SCDeleteConnection(WD.pCon);
LLDblobDelete(self->writeList);
return 1;
} else if(strstr(pText,"TASKSTART") != NULL){
WD.waitTask = 1 ;
LLDblobDelete(self->writeList);
LLDblobAppend(self->writeList,&WD, sizeof(writeData));
return 1;
} else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){
SCDeleteConnection(WD.pCon);
LLDblobDelete(self->writeList);
return 1;
} else {
SCWrite(WD.pCon,pText,eOut);
return 1;
}
}
status = LLDnodePtr2Next(self->writeList);
}
} }
return handle;
return 1;
} }
/*---------------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------------*/
static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) static int IncrementTransactionID()
{ {
char *answer = NULL, *pEnd, *command = NULL; transactionID++;
int length; if(transactionID >= 200000){
transactionID = 100000;
while(1){
TaskYield(pServ->pTasker);
if(!ANETvalidHandle(handle)){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host);
break;
}
answer = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
if(pCon != NULL){
*pEnd = '\0';
printSICS(answer,pCon);
}
traceIO("RO","%s:%d: Received %s", self->host, self->port,answer);
ANETreadConsume(handle,pEnd+strlen("TRANSACTIONFINISHED") - answer);
break;
}
} }
return transactionID;
} }
/*---------------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------------*/
static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
@ -628,14 +667,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
pDynString data; pDynString data;
char *remoteNode; char *remoteNode;
char *command, *answer, *pEnd; char *command, *answer, *pEnd;
writeData WD;
if((mm = GetHdbSetMessage(mes)) != NULL){ if((mm = GetHdbSetMessage(mes)) != NULL){
pCon = (SConnection *)mm->callData; pCon = (SConnection *)mm->callData;
handle = PrepareWriteHandle(self,pCon,&newHandle);
if(handle < 0){
return hdbAbort;
}
/* /*
build the command to send build the command to send
@ -650,13 +685,17 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
} }
return hdbAbort; return hdbAbort;
} }
snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); WD.pCon = SCCopyConnection(pCon);
WD.waitTask = 0;
WD.transID = IncrementTransactionID();
snprintf(command,length,"contextdo %d hset %s %s\r\n",
WD.transID, remoteNode, GetCharArray(data));
/* /*
write write
*/ */
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
status = ANETwrite(handle,command,strlen(command)); status = ANETwrite(self->writeHandle,command,strlen(command));
free(command); free(command);
DeleteDynString(data); DeleteDynString(data);
if(status < 0){ if(status < 0){
@ -665,55 +704,8 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
} }
return hdbAbort; return hdbAbort;
} }
LLDblobAppend(self->writeList,&WD,sizeof(writeData));
/* return hdbContinue;
wait for a response: TRANSACTIONFINISHED
*/
ProcessWriteResponse(self,handle,pCon);
/*
Is there a termination script?
*/
command = GetHdbProp(currentNode,"termscript");
if(command != NULL){
while(1) {
TaskYield(pServ->pTasker);
Tcl_Eval(InterpGetTcl(pServ->pSics),command);
answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics));
if(strstr(answer,"idle") != NULL){
answer = ANETreadPtr(handle,&length);
printSICS(answer,pCon);
traceIO("RO","%s:%d:Received %s", self->host,self->port,answer);
ANETreadConsume(handle,length);
break;
}
}
}
/*
Do I have to wait for a TASKFINISHED?
*/
command = GetHdbProp(currentNode,"taskwait");
if(command != NULL){
while(1) {
TaskYield(pServ->pTasker);
answer = ANETreadPtr(handle,&length);
if(length > 0 && strstr(answer,"TASKFINISHED") != NULL){
printSICS(answer,pCon);
traceIO("RO","%s:%d:Received %s", self->host,self->port,answer);
ANETreadConsume(handle,length);
break;
}
}
}
if(newHandle){
ANETclose(handle);
} else {
self->writeInUse = 0;
}
} }
return hdbContinue; return hdbContinue;
@ -746,7 +738,7 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd)
* Get information about the remote node and check compatability * Get information about the remote node and check compatability
*/ */
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){ if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host); self->host);
@ -803,61 +795,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon,
/*============================ remote execute =================================*/ /*============================ remote execute =================================*/
static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command)
{ {
int status, handle, newHandle = 0, length; int status;
char *answer, *pEnd; char answer[65536];
handle = PrepareWriteHandle(self,pCon,&newHandle);
if(handle < 0){
return 0;
}
/* /*
write, thereby taking care to prefix with transact and for proper termination write, thereby taking care to prefix with transact and for proper termination
*/ */
if(strstr(command,"transact") == NULL){ memset(answer,0,sizeof(answer)-1);
ANETwrite(handle,"transact ", sizeof("transact ")); status = transactCommand(self->transactHandle,command,answer,sizeof(answer));
} if(status){
status = ANETwrite(handle,command,strlen(command)); SCWrite(pCon,answer,eValue);
if(strstr(command,"\n") == NULL){
ANETwrite(handle,"\r\n",2);
}
if(status < 0){
traceIO("RO","Disconnect from %s while executing %s", self->host, command);
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
return 0;
}
/*
wait for response
*/
while(1){
TaskYield(pServ->pTasker);
if(!ANETvalidHandle(handle)){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
break;
}
answer = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
if(pCon != NULL){
*pEnd = '\0';
SCPrintf(pCon,eValue,answer);
}
ANETreadConsume(handle,length);
break;
}
}
if(newHandle){
ANETclose(handle);
} else { } else {
self->writeInUse = 0; SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
} }
return 1; return status;
} }
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon,
@ -939,6 +891,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
self->host = strdup(argv[2]); self->host = strdup(argv[2]);
self->port = atoi(argv[3]); self->port = atoi(argv[3]);
self->readList = LLDblobCreate(); self->readList = LLDblobCreate();
self->writeList = LLDblobCreate();
self->jtok = json_tokener_new();
ConnectRemoteObject(self); ConnectRemoteObject(self);
cmd = AddSICSHdbPar(pNew->objectNode, cmd = AddSICSHdbPar(pNew->objectNode,
@ -968,6 +922,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
snprintf(roTaskName,sizeof(roTaskName),"rowrite-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1);
return status; return status;
} }