Merge branch 'moreremo' of ssh://gitorious.psi.ch/sinqdev/sics into develop

This commit is contained in:
2015-06-05 08:35:21 +02:00
7 changed files with 287 additions and 272 deletions

View File

@ -581,6 +581,7 @@ void *ANETreadPtr(int handle, int *length)
con = findSocketDescriptor(handle); con = findSocketDescriptor(handle);
if (con == NULL) { if (con == NULL) {
*length = 0;
return NULL; return NULL;
} else { } else {
data = GetRWBufferData(con->readBuffer, length); data = GetRWBufferData(con->readBuffer, length);

View File

@ -74,6 +74,15 @@
#include "sicshipadaba.h" #include "sicshipadaba.h"
#include "protocol.h" #include "protocol.h"
#include "sicsvar.h" #include "sicsvar.h"
#include <json/json.h>
/*
Greetings from protocol.c for SCLogWrite...
*/
extern struct json_object *mkJSON_Object(SConnection * pCon, char *pBuffer,
int iOut);
/* /*
#define UUDEB 1 #define UUDEB 1
define UUDEB , for buffer writing for checking encoding */ define UUDEB , for buffer writing for checking encoding */
@ -255,6 +264,7 @@ static SConnection *CreateConnection(SicsInterp * pSics)
pRes->conStart = time(NULL); pRes->conStart = time(NULL);
pRes->write = SCNormalWrite; pRes->write = SCNormalWrite;
pRes->runLevel = RUNDRIVE; pRes->runLevel = RUNDRIVE;
pRes->remote = 0;
/* initialise context variables */ /* initialise context variables */
pRes->iCmdCtr = 0; pRes->iCmdCtr = 0;
@ -487,6 +497,7 @@ SConnection *SCCopyConnection(SConnection * pCon)
result->iList = -1; result->iList = -1;
result->runLevel = pCon->runLevel; result->runLevel = pCon->runLevel;
result->data = pCon->data; result->data = pCon->data;
result->remote = pCon->remote;
return result; return result;
} }
@ -1080,13 +1091,14 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
{ {
char pBueffel[1024]; char pBueffel[1024];
char *pPtr; char *pPtr;
json_object *myJson = NULL;
/* for commandlog tail */ /* for commandlog tail */
if (!VerifyConnection(self)) { if (!VerifyConnection(self)) {
return 0; return 0;
} }
if(self->iProtocolID == PROTACT) { if(self->iProtocolID == PROTACT) { /* act */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1098,6 +1110,12 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == PROTJSON) {
myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));
json_object_put(myJson);
}
} else { } else {
testAndWriteSocket(self, buffer, iOut); testAndWriteSocket(self, buffer, iOut);
} }
@ -1111,6 +1129,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
{ {
char pBueffel[1024]; char pBueffel[1024];
char *pPtr; char *pPtr;
json_object *myJson = NULL;
if (!VerifyConnection(self)) { if (!VerifyConnection(self)) {
return 0; return 0;
@ -1119,7 +1138,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
WriteToCommandLogId(NULL, self->sockHandle, buffer); WriteToCommandLogId(NULL, self->sockHandle, buffer);
SetSendingConnection(NULL); SetSendingConnection(NULL);
if(self->iProtocolID == 4) { if(self->iProtocolID == PROTACT) { /* act */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1131,7 +1150,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == 2) { } else if(self->iProtocolID == PROTCODE) { /* withcode */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1143,6 +1162,12 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == PROTJSON) { /* json */
myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));
json_object_put(myJson);
}
} else { } else {
testAndWriteSocket(self, buffer, iOut); testAndWriteSocket(self, buffer, iOut);
} }
@ -1830,7 +1855,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand)
memset(pBueffel, 0, 80); memset(pBueffel, 0, 80);
stptok(trim(pCommand), pBueffel, 79, " "); stptok(trim(pCommand), pBueffel, 79, " ");
self->iCmdCtr++; self->iCmdCtr++;
if (999999 < self->iCmdCtr) { if (self->iCmdCtr > 99998) {
self->iCmdCtr = 0; self->iCmdCtr = 0;
} }
self->transID = self->iCmdCtr; self->transID = self->iCmdCtr;
@ -1856,6 +1881,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand)
config File Filename Logs to another file config File Filename Logs to another file
config output normal | withcode | ACT Sets output mode config output normal | withcode | ACT Sets output mode
config listen 0 | 1 enables commandlog listen mode config listen 0 | 1 enables commandlog listen mode
config remote sets the remote connection flag
---------------------------------------------------------------------------*/ ---------------------------------------------------------------------------*/
int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData, int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData,
@ -1913,6 +1939,10 @@ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData,
SCSendOK(pCon); SCSendOK(pCon);
return 1; return 1;
} }
} else if(strcmp(argv[1],"remote") == 0) {
pMaster->remote = 1;
pCon->remote = 1;
return 1;
} }
/* check no or args */ /* check no or args */

View File

@ -71,6 +71,7 @@ typedef struct __SConnection {
pCosta pStack; /* stack of pending commands */ pCosta pStack; /* stack of pending commands */
int contextStack; /* context stack: may go? */ int contextStack; /* context stack: may go? */
mkChannel *pSock; /* for temporary backwards compatability */ mkChannel *pSock; /* for temporary backwards compatability */
int remote; /* true if this is a remote object connection */
} SConnection; } SConnection;
#include "nserver.h" #include "nserver.h"

View File

@ -242,6 +242,9 @@ static int DriveTaskFunc(void *data)
ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); ExeInterest(pServ->pExecutor,taskData->name, "finished with problem");
} }
traceSys("drive","DriveTask %s finished with state %d", taskData->name,status); traceSys("drive","DriveTask %s finished with state %d", taskData->name,status);
if(taskData->pCon->transID > 100000) {
SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID);
}
return 0; return 0;
} }
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
@ -272,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget)
ExeInterest(pServ->pExecutor,name,"started"); ExeInterest(pServ->pExecutor,name,"started");
DevexecLog("START",name); DevexecLog("START",name);
InvokeNewTarget(pServ->pExecutor,name,fTarget); InvokeNewTarget(pServ->pExecutor,name,fTarget);
if(pCon->transID > 100000) {
SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID);
}
taskData->id = DRIVEID; taskData->id = DRIVEID;
taskData->obj = obj; taskData->obj = obj;
@ -393,6 +399,9 @@ static int CountTaskFunc(void *data)
ExeInterest(pServ->pExecutor,taskData->name, "finished with problem"); ExeInterest(pServ->pExecutor,taskData->name, "finished with problem");
} }
traceSys("count","CountTask %s finished with state %d", taskData->name,status); traceSys("count","CountTask %s finished with state %d", taskData->name,status);
if(taskData->pCon->transID > 100000) {
SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID);
}
return 0; return 0;
} }
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
@ -418,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name)
} }
ExeInterest(pServ->pExecutor,name,"started"); ExeInterest(pServ->pExecutor,name,"started");
DevexecLog("START",name); DevexecLog("START",name);
if(pCon->transID > 100000) {
SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID);
}
taskData->id = COUNTID; taskData->id = COUNTID;
taskData->obj = obj; taskData->obj = obj;

View File

@ -527,7 +527,7 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData,
SCSetInterrupt(pCon, eAbortBatch); SCSetInterrupt(pCon, eAbortBatch);
self->pDrivInt->iErrorCount = 0; self->pDrivInt->iErrorCount = 0;
child = GetHipadabaNode(self->pDescriptor->parNode, "status"); child = GetHipadabaNode(self->pDescriptor->parNode, "status");
UpdateHipadabaPar(child, MakeHdbText("run"), pCon); UpdateHipadabaPar(child, MakeHdbText("error"), pCon);
return hdbAbort; return hdbAbort;
} }

View File

@ -192,7 +192,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData,
SCWrite(pCon, "ERROR: no more memory", eError); SCWrite(pCon, "ERROR: no more memory", eError);
return 0; return 0;
} }
if(comCon->transID > 100000) {
SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID);
}
status = InterpExecute(pSics, comCon, command); status = InterpExecute(pSics, comCon, command);
if(comCon->transID > 100000) {
SCPrintf(comCon,eLog,"COMEND %d", comCon->transID);
}
if (command != buffer) if (command != buffer)
free(command); free(command);
SCDeleteConnection(comCon); SCDeleteConnection(comCon);

View File

@ -2,7 +2,7 @@
* Remote objects in sicsobj. This means accessing remote objects in a different * Remote objects in sicsobj. This means accessing remote objects in a different
* SICS server from a master SICS server. * SICS server from a master SICS server.
* *
* Reading is implementd according to this scheme: * Reading is implemented according to this scheme:
* *
* * When a read connection is made between a local node and a remote node in slave, then a * * When a read connection is made between a local node and a remote node in slave, then a
* callback is installed on remote node in slave. * callback is installed on remote node in slave.
@ -17,7 +17,7 @@
* *
* COPRYRIGHT: see file COPYRIGHT * COPRYRIGHT: see file COPYRIGHT
* *
* Mark Koennecke, February 2015 * Mark Koennecke, February-May 2015
**/ **/
#include <unistd.h> #include <unistd.h>
#include <time.h> #include <time.h>
@ -31,22 +31,29 @@
#include <lld_blob.h> #include <lld_blob.h>
#include <dynstring.h> #include <dynstring.h>
#include <stptok.h> #include <stptok.h>
#include <json/json.h>
#define OOM -5001 /* out of memory */ #define OOM -5001 /* out of memory */
#define TO -5002 /* timeout */ #define TO -5002 /* timeout */
#define READACT 7654
#define POCHACT 8437
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
extern char *trim(char *txt); extern char *trim(char *txt);
static int transactionID = 100000;
/*---------------------- our very private data structure -------------------*/ /*---------------------- our very private data structure -------------------*/
typedef struct { typedef struct {
char *host; char *host;
int port; int port;
int readHandle; int handle;
int writeHandle; int transactHandle;
int writeInUse;
int readList; int readList;
int writeList;
unsigned int connected; unsigned int connected;
time_t nextHeartbeat; time_t nextHeartbeat;
struct json_tokener *jtok;
} RemoteOBJ, *pRemoteOBJ; } RemoteOBJ, *pRemoteOBJ;
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
typedef struct { typedef struct {
@ -59,6 +66,12 @@ typedef struct {
char *remotePath; char *remotePath;
} UpdateCallback, *pUpdateCallback; } UpdateCallback, *pUpdateCallback;
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
typedef struct {
int transID;
SConnection *pCon;
int waitTask;
}writeData, *pWriteData;
/*----------------------------------------------------------------------------*/
void KillRemoteOBJ(void *data) void KillRemoteOBJ(void *data)
{ {
char roTaskName[132]; char roTaskName[132];
@ -68,66 +81,13 @@ void KillRemoteOBJ(void *data)
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
StopTask(pServ->pTasker,roTaskName); StopTask(pServ->pTasker,roTaskName);
free(self->host); free(self->host);
ANETclose(self->readHandle); ANETclose(self->handle);
ANETclose(self->writeHandle); ANETclose(self->transactHandle);
LLDdeleteBlob(self->readList); LLDdeleteBlob(self->readList);
LLDdeleteBlob(self->writeList);
json_tokener_free(self->jtok);
} }
} }
/*========================= reading related code ================================*/
static int RemoteReadCallback(int handle, void *userData)
{
int length;
char *pPtr, *pStart, *pEnd;
pPtr = ANETreadPtr(handle,&length);
/*
* deal with command results
*/
pStart = strstr(pPtr, "TRANSACTIONSTART");
pEnd = strstr(pPtr,"TRANSACTIONEND");
if(pStart != NULL && pEnd != NULL){
pStart = pStart + strlen("TRANSACTIONSTART");
*pEnd = '\0';
traceIO("RO","Received command - reply: %s", pStart);
pEnd += strlen("TRANSACTIONEND");
ANETreadConsume(handle,pEnd - pPtr);
}
/*
* deal with update messages
*/
pStart = strstr(pPtr, "SROC:");
pEnd = strstr(pPtr,":EROC\r\n");
if(pStart != NULL && pEnd != NULL){
pStart += strlen("SROC:");
*pEnd = '\0';
InterpExecute(pServ->pSics, pServ->dummyCon,pStart);
traceIO("RO", "Received %s from remote", pStart);
pEnd += strlen("EROC\r\n");
ANETreadConsume(handle,pEnd - pPtr);
}
/*
* deal with heartbeats
*/
if((pStart = strstr(pPtr,"Poch")) != NULL){
ANETreadConsume(handle,(pStart+4) - pPtr);
}
/*
If there is more stuff to process: recurse
*/
pPtr = ANETreadPtr(handle,&length);
if(length > 0 &&
( strstr(pPtr,":EROC\r\n") != NULL ||
strstr(pPtr,"TRANSACTIONEND") != NULL
|| strstr(pPtr,"Poch") != NULL ) ) {
RemoteReadCallback(handle,userData);
}
return 1;
}
/*-----------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------*/
static int transactCommand(int handle, char *command, char *reply, int replyLen) static int transactCommand(int handle, char *command, char *reply, int replyLen)
{ {
@ -135,7 +95,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
char *prefix = {"transact "}; char *prefix = {"transact "};
int status, length, type; int status, length, type;
time_t start; time_t start;
char *pPtr; char *pPtr, *pEnd;
/* /*
* read possible dirt of the line * read possible dirt of the line
@ -144,12 +104,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
ANETreadConsume(handle,length); ANETreadConsume(handle,length);
toSend = malloc(strlen(command) + strlen(prefix) + 1); toSend = malloc(strlen(command) + strlen(prefix) + 10);
if(toSend == NULL){ if(toSend == NULL){
return OOM; return OOM;
} }
strcpy(toSend, prefix); strcpy(toSend, prefix);
strcat(toSend, command); strcat(toSend, command);
if(strstr(command,"\n") == NULL){
strcat(toSend,"\r\n");
}
status = ANETwrite(handle,toSend,strlen(toSend)); status = ANETwrite(handle,toSend,strlen(toSend));
free(toSend); free(toSend);
if(status != 1){ if(status != 1){
@ -163,7 +126,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
while(time(NULL) < start + 2.0){ while(time(NULL) < start + 2.0){
ANETprocess(); ANETprocess();
pPtr = ANETreadPtr(handle,&length); pPtr = ANETreadPtr(handle,&length);
if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){
*pEnd = '\0';
strncpy(reply,pPtr,replyLen); strncpy(reply,pPtr,replyLen);
ANETreadConsume(handle,length); ANETreadConsume(handle,length);
return 1; return 1;
@ -191,9 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self)
return; return;
} }
self->readHandle = ANETconnect(self->host, self->port); self->handle = ANETconnect(self->host, self->port);
self->writeHandle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port);
if(self->readHandle < 0 || self->writeHandle < 0){ if(self->handle < 0 || self->transactHandle < 0){
self->connected = 0; self->connected = 0;
traceIO("RO","Failed to connect to remote objects at %s, port %d", traceIO("RO","Failed to connect to remote objects at %s, port %d",
self->host, self->port); self->host, self->port);
@ -206,23 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self)
Default login with hard coded manager login. Defined in Default login with hard coded manager login. Defined in
nserver.c nserver.c
*/ */
ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->handle,login,strlen(login));
ANETwrite(self->writeHandle,login,strlen(login)); ANETwrite(self->transactHandle,login,strlen(login));
usleep(500); usleep(500);
ANETprocess(); ANETprocess();
/* /*
eat the login responses eat the login responses
*/ */
pPtr = ANETreadPtr(self->readHandle, &length); pPtr = ANETreadPtr(self->handle, &length);
ANETreadConsume(self->readHandle,length); ANETreadConsume(self->handle,length);
pPtr = ANETreadPtr(self->writeHandle, &length); pPtr = ANETreadPtr(self->transactHandle, &length);
ANETreadConsume(self->writeHandle,length); ANETreadConsume(self->transactHandle,length);
transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1);
/*
* install the read callback
*/
ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL);
/* /*
* Remove geterror on read nodes and reinstall callbacks for reconnects * Remove geterror on read nodes and reinstall callbacks for reconnects
@ -233,17 +194,15 @@ static void ConnectRemoteObject(pRemoteOBJ self)
node = FindHdbNode(NULL,rd.localNode,NULL); node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){ if(node != NULL){
SetHdbProperty(node,"geterror",NULL); SetHdbProperty(node,"geterror",NULL);
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
rd.remoteNode, rd.localNode); READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->readHandle,command,strlen(command)); ANETwrite(self->handle,command,strlen(command));
} }
status = LLDnodePtr2Next(self->readList); status = LLDnodePtr2Next(self->readList);
} }
transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command));
self->connected = 1; self->connected = 1;
self->writeInUse = 0;
} }
/*-----------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------*/
static void MarkDisconnected(pRemoteOBJ self) static void MarkDisconnected(pRemoteOBJ self)
@ -270,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
pUpdateCallback uppi = (pUpdateCallback)userData; pUpdateCallback uppi = (pUpdateCallback)userData;
hdbDataMessage *mm = NULL; hdbDataMessage *mm = NULL;
pDynString text; pDynString text;
char *prefix = {"SROC:hupdate "}; char *prefix = {"hupdate "};
char *postfix= {":EROC\r\n"}; char *postfix= {" \r\n"};
char *txt = NULL; char *txt = NULL;
int length; int length;
pHdbPropertyChange propChange = NULL; pHdbPropertyChange propChange = NULL;
@ -309,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
if(!SCisConnected(uppi->sendCon)){ if(!SCisConnected(uppi->sendCon)){
return hdbKill; return hdbKill;
} }
length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + length = strlen("hdelprop ") + strlen(uppi->remotePath) +
strlen(propChange->key) + 10; strlen(propChange->key) + 10;
if(propChange->value != NULL){ if(propChange->value != NULL){
length += strlen(propChange->value); length += strlen(propChange->value);
@ -319,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
return hdbContinue; return hdbContinue;
} }
if(propChange->value == NULL){ if(propChange->value == NULL){
snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath,
propChange->key,postfix); propChange->key,postfix);
} else { } else {
snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath,
propChange->key,propChange->value, postfix); propChange->key,propChange->value, postfix);
} }
SCWrite(uppi->sendCon,txt,eValue); SCWrite(uppi->sendCon,txt,eValue);
@ -397,7 +356,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
* Get information about the remote node and check compatability * Get information about the remote node and check compatability
*/ */
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){ if(status != 1){
/* /*
* try a reconnect, * try a reconnect,
@ -407,7 +366,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
*/ */
self->connected = 0; self->connected = 0;
ConnectRemoteObject(self); ConnectRemoteObject(self);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){ if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host); self->host);
@ -448,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
* Install a callback on the remote node to update the master. The remote should * Install a callback on the remote node to update the master. The remote should
* then immediatly send an update which will be processed by the read callback. * then immediatly send an update which will be processed by the read callback.
*/ */
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
rd.remoteNode, rd.localNode); READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->readHandle,command,strlen(command)); ANETwrite(self->handle,command,strlen(command));
return 1; return 1;
} }
@ -492,10 +451,10 @@ static int HeartbeatTask(void *pData)
{ {
pRemoteOBJ self = (pRemoteOBJ)pData; pRemoteOBJ self = (pRemoteOBJ)pData;
int status; int status;
char command[] = {"Poch\r\n"}; char command[] = {"contextdo 8437 Poch\r\n"};
if (time(NULL) > self->nextHeartbeat){ if (time(NULL) > self->nextHeartbeat){
status = ANETwrite(self->readHandle,command, strlen(command)); status = ANETwrite(self->handle,command, strlen(command));
if(status != 1){ if(status != 1){
traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port);
self->connected = 0; self->connected = 0;
@ -509,11 +468,13 @@ static int HeartbeatTask(void *pData)
return 1; return 1;
} }
/*============================= writing related code =========================== /*============================= writing related code ===========================
The logic here is to use the standard writeHandle when available. I expect most This works by sending the command via contextdo with a ID > 10^6. This causes
communication to be short and to happen through the writeHandle. If that one is the remote SICS to send the termination messages. The transaction IDs together
in use, a new connection will be built. with the connection responsible for it are kept in a list.
---------------------------------------------------------------------------------
suppress all superfluous OK from the slave This list is used by the write task to forward messages properly and for handling
termination.
-----------------------------------------------------------------------------------*/ -----------------------------------------------------------------------------------*/
#include <outcode.c> #include <outcode.c>
static OutCode findOutCode(char *txt) static OutCode findOutCode(char *txt)
@ -527,93 +488,153 @@ static OutCode findOutCode(char *txt)
} }
return eValue; return eValue;
} }
/*--------------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------------*/
static void printSICS(char *answer, SConnection *pCon) static void CheckWriteList(int writeList,int transID, OutCode eOut, char *pText)
{ {
char line[1024], *pPtr, *pCode; int status;
OutCode eCode; writeData WD;
pPtr = answer;
while(pPtr != NULL){
memset(line,0,sizeof(line));
pPtr = stptok(pPtr,line,sizeof(line),"\n");
if(strstr(line,"OK") == NULL){
pCode = strstr(line,"@@");
if(pCode != NULL){
*pCode = '\0';
pCode += 2;
eCode = findOutCode(trim(pCode));
status = LLDnodePtr2First(writeList);
while(status == 1){
LLDblobData(writeList,&WD);
if(WD.transID == transID){
if(strstr(pText,"COMSTART") != NULL){
/* skip */
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) {
SCDeleteConnection(WD.pCon);
LLDblobDelete(writeList);
return;
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) {
/* skip */
return;
} else if(strstr(pText,"TASKSTART") != NULL){
WD.waitTask = 1 ;
LLDblobDelete(writeList);
LLDblobAppend(writeList,&WD, sizeof(writeData));
return;
} else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){
SCDeleteConnection(WD.pCon);
LLDblobDelete(writeList);
return;
} else { } else {
eCode = eValue; if(strstr(pText,"OK") == NULL){
SCWrite(WD.pCon,pText,eOut);
}
return;
} }
SCWrite(pCon,line,eCode);
} }
status = LLDnodePtr2Next(writeList);
} }
} }
/*---------------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------------*/
static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) static int WriteResponseTask(void *pData)
{ {
int handle, length; pRemoteOBJ self = (pRemoteOBJ)pData;
char *answer = NULL; int status, length = 0, transID;
char command[80]; char *pText, *outTxt;
json_object *message = NULL, *data = NULL;
enum json_tokener_error tokerr;
OutCode eOut;
writeData WD;
if(self->writeInUse) { if(!ANETvalidHandle(self->handle)) {
handle = ANETconnect(self->host,self->port); return 1;
if(handle < 0){
traceIO("RO","Failed to connect to %s at %d", self->host, self->port);
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port);
}
return handle;
}
ANETwrite(handle,login,strlen(login));
usleep(500);
ANETprocess();
/*
eat the login responses
*/
answer = ANETreadPtr(handle, &length);
ANETreadConsume(handle,length);
*newHandle = 1;
transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command));
} else {
self->writeInUse = 1;
handle = self->writeHandle;
/*
eat dirt from the line
*/
answer = ANETreadPtr(handle, &length);
ANETreadConsume(handle,length);
} }
return handle;
pText = ANETreadPtr(self->handle,&length);
while(length > 0){
json_tokener_reset(self->jtok);
message = json_tokener_parse_ex(self->jtok,pText,length);
tokerr = self->jtok->err;
if(tokerr == json_tokener_continue){
return 1;
} else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s %d",
json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset);
ANETreadConsume(self->handle,length);
return 1;
}
if(json_object_get_type(message) != json_type_object) {
traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host);
ANETreadConsume(self->handle,length);
return 1;
}
/*
we need to consume here what has been parsed.
The char_offset in the tokenizer structure might tell us that...
*/
ANETreadConsume(self->handle,self->jtok->char_offset);
/*
Received a valid message, process
*/
data = json_object_object_get(message,"trans");
if(data == NULL){
traceIO("RO","No transaction ID found in %s from %s", pText,self->host);
return 1;
}
transID = json_object_get_int(data);
data = json_object_object_get(message,"flag");
if(data == NULL){
traceIO("RO","No flag found in %s from %s", pText,self->host);
return 1;
}
outTxt = (char *)json_object_get_string(data);
eOut = findOutCode(outTxt);
data = json_object_object_get(message,"data");
if(data == NULL){
traceIO("RO","No data found in %s from %s", pText,self->host);
return 1;
}
pText = (char *)json_object_get_string(data);
traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText);
/*
do nothing on Poch
*/
if(transID == POCHACT){
pText = ANETreadPtr(self->handle,&length);
json_object_put(message);
continue;
}
/*
process update messages
*/
if(transID == READACT){
if(strstr(pText,"hupdate") != NULL || strstr(pText,"prop") != NULL){
InterpExecute(pServ->pSics,pServ->dummyCon,pText);
}
traceIO("RO","Received %s from remote",pText);
pText = ANETreadPtr(self->handle,&length);
json_object_put(message);
continue;
}
/*
check write List
*/
CheckWriteList(self->writeList,transID,eOut,pText);
json_object_put(message);
pText = ANETreadPtr(self->handle,&length);
}
return 1;
} }
/*---------------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------------*/
static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) static int IncrementTransactionID()
{ {
char *answer = NULL, *pEnd, *command = NULL; transactionID++;
int length; if(transactionID >= 200000){
transactionID = 100000;
while(1){
TaskYield(pServ->pTasker);
if(!ANETvalidHandle(handle)){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host);
break;
}
answer = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
if(pCon != NULL){
*pEnd = '\0';
printSICS(answer,pCon);
}
traceIO("RO","%s:%d: Received %s", self->host, self->port,answer);
ANETreadConsume(handle,length);
break;
}
} }
return transactionID;
} }
/*---------------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------------*/
static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
@ -626,14 +647,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
pDynString data; pDynString data;
char *remoteNode; char *remoteNode;
char *command, *answer, *pEnd; char *command, *answer, *pEnd;
writeData WD;
if((mm = GetHdbSetMessage(mes)) != NULL){ if((mm = GetHdbSetMessage(mes)) != NULL){
pCon = (SConnection *)mm->callData; pCon = (SConnection *)mm->callData;
handle = PrepareWriteHandle(self,pCon,&newHandle);
if(handle < 0){
return hdbAbort;
}
/* /*
build the command to send build the command to send
@ -648,53 +665,32 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
} }
return hdbAbort; return hdbAbort;
} }
snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); WD.pCon = SCCopyConnection(pCon);
WD.waitTask = 0;
WD.transID = IncrementTransactionID();
snprintf(command,length,"contextdo %d hset %s %s\r\n",
WD.transID, remoteNode, GetCharArray(data));
/* /*
write write
*/ */
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
status = ANETwrite(handle,command,strlen(command)); LLDblobAppend(self->writeList,&WD,sizeof(writeData));
status = ANETwrite(self->handle,command,strlen(command));
free(command); free(command);
DeleteDynString(data); DeleteDynString(data);
if(status < 0){ if(status < 0){
if(pCon != NULL){ self->connected = 0;
SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host); ConnectRemoteObject(self);
} if(self->connected == 0){
return hdbAbort; if(pCon != NULL){
} SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected",
remoteNode, self->host);
/* }
wait for a response: TRANSACTIONFINISHED return hdbAbort;
*/
ProcessWriteResponse(self,handle,pCon);
/*
Is there a termination script?
*/
command = GetHdbProp(currentNode,"termscript");
if(command != NULL){
while(1) {
TaskYield(pServ->pTasker);
Tcl_Eval(InterpGetTcl(pServ->pSics),command);
answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics));
if(strstr(answer,"idle") != NULL){
answer = ANETreadPtr(handle,&length);
printSICS(answer,pCon);
traceIO("RO","%s:%d:Received %s", self->host,self->port,answer);
ANETreadConsume(handle,length);
break;
}
} }
} }
return hdbContinue;
if(newHandle){
ANETclose(handle);
} else {
self->writeInUse = 0;
}
} }
return hdbContinue; return hdbContinue;
@ -722,12 +718,17 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd)
SetHdbProperty(localNode,"remotewrite",rd.remoteNode); SetHdbProperty(localNode,"remotewrite",rd.remoteNode);
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback, AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback,
self,NULL)); self,NULL));
/*
TODO: The connected write nodes should be held in a list in order to be able to
remove the write callbacks when deleting the remote object. As removing remote
objects usually only happens when SICS shuts down this is not so important.
*/
/* /*
* Get information about the remote node and check compatability * Get information about the remote node and check compatability
*/ */
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){ if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host); self->host);
@ -784,61 +785,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon,
/*============================ remote execute =================================*/ /*============================ remote execute =================================*/
static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command)
{ {
int status, handle, newHandle = 0, length; int status;
char *answer, *pEnd; char answer[65536];
handle = PrepareWriteHandle(self,pCon,&newHandle);
if(handle < 0){
return 0;
}
/* /*
write, thereby taking care to prefix with transact and for proper termination write, thereby taking care to prefix with transact and for proper termination
*/ */
if(strstr(command,"transact") == NULL){ memset(answer,0,sizeof(answer)-1);
ANETwrite(handle,"transact ", sizeof("transact ")); status = transactCommand(self->transactHandle,command,answer,sizeof(answer));
} if(status == 1){
status = ANETwrite(handle,command,strlen(command)); SCWrite(pCon,answer,eValue);
if(strstr(command,"\n") == NULL){
ANETwrite(handle,"\r\n",2);
}
if(status < 0){
traceIO("RO","Disconnect from %s while executing %s", self->host, command);
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
return 0;
}
/*
wait for response
*/
while(1){
TaskYield(pServ->pTasker);
if(!ANETvalidHandle(handle)){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
break;
}
answer = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
if(pCon != NULL){
*pEnd = '\0';
SCPrintf(pCon,eValue,answer);
}
ANETreadConsume(handle,length);
break;
}
}
if(newHandle){
ANETclose(handle);
} else { } else {
self->writeInUse = 0; SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
} }
return 1; return status;
} }
/*------------------------------------------------------------------------------*/ /*------------------------------------------------------------------------------*/
static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon,
@ -920,6 +881,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
self->host = strdup(argv[2]); self->host = strdup(argv[2]);
self->port = atoi(argv[3]); self->port = atoi(argv[3]);
self->readList = LLDblobCreate(); self->readList = LLDblobCreate();
self->writeList = LLDblobCreate();
self->jtok = json_tokener_new();
ConnectRemoteObject(self); ConnectRemoteObject(self);
cmd = AddSICSHdbPar(pNew->objectNode, cmd = AddSICSHdbPar(pNew->objectNode,
@ -949,6 +912,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1);
return status; return status;
} }