Files
sics/remoteobject.c

945 lines
27 KiB
C

/**
* Remote objects in sicsobj. This means accessing remote objects in a different
* SICS server from a master SICS server.
*
* Reading is implemented according to this scheme:
*
* * When a read connection is made between a local node and a remote node in slave, then a
* callback is installed on remote node in slave.
* * The callback on remote in slave sends commands to update the local node when either the
* value or the geterror property changes on the remote node. The commands sent are
* enclosed in special delimiters
* * A special ANET callback evaluates the data coming from slave and acts accordingly, thus
* updating the local node. This is driven by the general network driving code of SICS
*
* * in order to detect availability and re-availability of slave a Heartbeat Task sends a
* heartbeat message to slave. Thereby testing the connection regularly, trying to reconnect etc.
*
* COPRYRIGHT: see file COPYRIGHT
*
* Mark Koennecke, February-May 2015
**/
#include <unistd.h>
#include <time.h>
#include <tcl.h>
#include <mxml.h>
#include <sics.h>
#include <sicsobj.h>
#include <sicshipadaba.h>
#include <asynnet.h>
#include <lld.h>
#include <lld_blob.h>
#include <dynstring.h>
#include <stptok.h>
#include <json/json.h>
#define OOM -5001 /* out of memory */
#define TO -5002 /* timeout */
#define READACT 7654
#define POCHACT 8437
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
extern char *trim(char *txt);
static int transactionID = 100000;
/*---------------------- our very private data structure -------------------*/
typedef struct {
char *host;
int port;
int handle;
int transactHandle;
int readList;
int writeList;
unsigned int connected;
time_t nextHeartbeat;
struct json_tokener *jtok;
} RemoteOBJ, *pRemoteOBJ;
/*----------------------------------------------------------------------------*/
typedef struct {
char localNode[1024];
char remoteNode[1024];
} ReadData, *pReadData;
/*---------------------------------------------------------------------------*/
typedef struct {
SConnection *sendCon;
char *remotePath;
} UpdateCallback, *pUpdateCallback;
/*----------------------------------------------------------------------------*/
typedef struct {
int transID;
SConnection *pCon;
int waitTask;
}writeData, *pWriteData;
/*----------------------------------------------------------------------------*/
void KillRemoteOBJ(void *data)
{
char roTaskName[132];
pRemoteOBJ self = (pRemoteOBJ) data;
if(data != NULL){
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
StopTask(pServ->pTasker,roTaskName);
free(self->host);
ANETclose(self->handle);
ANETclose(self->transactHandle);
LLDdeleteBlob(self->readList);
LLDdeleteBlob(self->writeList);
json_tokener_free(self->jtok);
}
}
/*-----------------------------------------------------------------------------*/
static int transactCommand(int handle, char *command, char *reply, int replyLen)
{
char *toSend = NULL;
char *prefix = {"transact "};
int status, length, type;
time_t start;
char *pPtr, *pEnd;
/*
* read possible dirt of the line
*/
pPtr = ANETreadPtr(handle,&length);
ANETreadConsume(handle,length);
toSend = malloc(strlen(command) + strlen(prefix) + 10);
if(toSend == NULL){
return OOM;
}
strcpy(toSend, prefix);
strcat(toSend, command);
if(strstr(command,"\n") == NULL){
strcat(toSend,"\r\n");
}
status = ANETwrite(handle,toSend,strlen(toSend));
free(toSend);
if(status != 1){
return status;
}
/*
* wait for a reply for max 2 seconds
*/
start = time(NULL);
while(time(NULL) < start + 2.0){
ANETprocess();
pPtr = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){
*pEnd = '\0';
strncpy(reply,pPtr,replyLen);
ANETreadConsume(handle,length);
return 1;
}
usleep(100);
}
/*
* here we have run into a timeout
*/
ANETreadConsume(handle,length);
return TO;
}
/*----------------------------------------------------------------------------*/
static void ConnectRemoteObject(pRemoteOBJ self)
{
char *pPtr, command[1024];
int length, status;
ReadData rd;
pHdb node;
if(self->connected){
return;
}
self->handle = ANETconnect(self->host, self->port);
self->transactHandle = ANETconnect(self->host, self->port);
if(self->handle < 0 || self->transactHandle < 0){
self->connected = 0;
traceIO("RO","Failed to connect to remote objects at %s, port %d",
self->host, self->port);
return;
}
traceIO("RO","Connected to %s, port %d for remote objects",
self->host, self->port);
/*
Default login with hard coded manager login. Defined in
nserver.c
*/
ANETwrite(self->handle,login,strlen(login));
ANETwrite(self->transactHandle,login,strlen(login));
usleep(500);
ANETprocess();
/*
eat the login responses
*/
pPtr = ANETreadPtr(self->handle, &length);
ANETreadConsume(self->handle,length);
pPtr = ANETreadPtr(self->transactHandle, &length);
ANETreadConsume(self->transactHandle,length);
transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1);
/*
* Remove geterror on read nodes and reinstall callbacks for reconnects
*/
status = LLDnodePtr2First(self->readList);
while(status != 0) {
LLDblobData(self->readList,&rd);
node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){
SetHdbProperty(node,"geterror",NULL);
snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->handle,command,strlen(command));
}
status = LLDnodePtr2Next(self->readList);
}
self->connected = 1;
}
/*-----------------------------------------------------------------------------*/
static void MarkDisconnected(pRemoteOBJ self)
{
int status;
ReadData rd;
pHdb node;
status = LLDnodePtr2First(self->readList);
while(status != 0) {
LLDblobData(self->readList,&rd);
node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){
SetHdbProperty(node,"geterror","Disconnected from remote server");
}
status = LLDnodePtr2Next(self->readList);
}
self->connected = 0;
}
/*-----------------------------------------------------------------------------*/
static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
pHdbMessage mes)
{
pUpdateCallback uppi = (pUpdateCallback)userData;
hdbDataMessage *mm = NULL;
pDynString text;
char *prefix = {"hupdate "};
char *postfix= {" \r\n"};
char *txt = NULL;
int length;
pHdbPropertyChange propChange = NULL;
mm = GetHdbUpdateMessage(mes);
if(mm != NULL){
/*
* remove myself when the connection is dead...
*/
if(!SCisConnected(uppi->sendCon)){
return hdbKill;
}
/*
* format and send the update command to master
*/
text = formatValue(*(mm->v), currentNode);
length = GetDynStringLength(text) +
strlen(prefix) + strlen(postfix) + strlen(uppi->remotePath) +5;
txt = malloc(length*sizeof(char));
if(txt == NULL){
return hdbContinue;
}
snprintf(txt,length,"%s %s %s %s", prefix, uppi->remotePath,
GetCharArray(text), postfix);
SCWrite(uppi->sendCon,txt,eValue);
free(txt);
DeleteDynString(text);
}
propChange = GetPropertyChangeMessage(mes);
if(propChange != NULL){
/*
* remove myself when the connection is dead...
*/
if(!SCisConnected(uppi->sendCon)){
return hdbKill;
}
length = strlen("hdelprop ") + strlen(uppi->remotePath) +
strlen(propChange->key) + 10;
if(propChange->value != NULL){
length += strlen(propChange->value);
}
txt = malloc(length*sizeof(char));
if(txt == NULL){
return hdbContinue;
}
if(propChange->value == NULL){
snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath,
propChange->key,postfix);
} else {
snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath,
propChange->key,propChange->value, postfix);
}
SCWrite(uppi->sendCon,txt,eValue);
free(txt);
}
return hdbContinue;
}
/*-----------------------------------------------------------------------------*/
static hdbCallbackReturn GetErrorCallback(pHdb currentNode, void *userData,
pHdbMessage mes)
{
hdbDataMessage *mm= NULL;
char *geterror, error[512];
SConnection *con = NULL;
mm = GetHdbGetMessage(mes);
if (mm != NULL) {
con = mm->callData;
geterror = GetHdbProp(currentNode, "geterror");
if (geterror != NULL) {
snprintf(error,sizeof(error),"ERROR: %s", geterror);
SCWrite(con, error, eError);
if (mm->v->dataType == HIPTEXT) {
if (mm->v->v.text != NULL) {
free(mm->v->v.text);
}
mm->v->v.text = strdup(error);
}
return hdbAbort;
}
}
return hdbContinue;
}
/*-----------------------------------------------------------------------------*/
static void KillUpdateStruct(void *data)
{
pUpdateCallback self = (pUpdateCallback)data;
if(data != NULL){
SCDeleteConnection(self->sendCon);
free(self->remotePath);
free(self);
}
}
/*-----------------------------------------------------------------------------*/
static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
{
char command[1024], reply[1024], *pPtr;
int status, type;
pHdb localNode = NULL;
/*
* Initialize....
*/
localNode = FindHdbNode(NULL,rd.localNode, pCon);
if(localNode == NULL){
SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode);
return 0;
}
/**
* Refuse duplicate connections
*/
pPtr = GetHdbProp(localNode,"remoteread");
if(pPtr != NULL){
SCPrintf(pCon,eError,"ERROR: %s is already connected to %s", rd.localNode, pPtr);
return 0;
}
/*
* Get information about the remote node and check compatability
*/
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
/*
* try a reconnect,
* when fails:
* Warning
* add blob
*/
self->connected = 0;
ConnectRemoteObject(self);
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host);
MarkDisconnected(self);
LLDblobAdd(self->readList,&rd,sizeof(rd));
AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback,
NULL,NULL));
SetHdbProperty(localNode,"remoteread",rd.remoteNode);
return 1;
}
}
if(strstr(reply, "ERROR") != NULL){
SCPrintf(pCon,eError,"%s while trying to contact remote node %s",
reply, rd.remoteNode);
return 0;
}
/* only interested in type: answer is of style: type,nochildren,length */
pPtr = strchr(reply,',');
*pPtr= '\0';
type = convertHdbType(reply);
if(type != localNode->value.dataType){
SCPrintf(pCon,eError,
"ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d",
rd.localNode, rd.remoteNode, localNode->value.dataType, type);
return 0;
}
/*
* Make an entry in the read list
*/
LLDblobAdd(self->readList,&rd,sizeof(rd));
AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback,
NULL,NULL));
SetHdbProperty(localNode,"remoteread",rd.remoteNode);
/*
* Install a callback on the remote node to update the master. The remote should
* then immediatly send an update which will be processed by the read callback.
*/
snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->handle,command,strlen(command));
return 1;
}
/*-----------------------------------------------------------------------------*/
static int ConnectreadCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
ReadData rd;
pHdb localNode = NULL;
char command[1024], reply[1024], *pPtr;
int status, type;
pRemoteOBJ self;
if(nPar < 2) {
SCWrite(pCon,"ERROR: need path to local node and remote node for connectread",
eError);
return 0;
}
/*
* Initialize....
*/
strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode));
strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode));
self = (pRemoteOBJ)ccmd->pPrivate;
status = ConnectRead(self,pCon,rd);
if(status == 1){
SCSendOK(pCon);
}
return status;
}
/*-----------------------------------------------------------------------------*/
static int HeartbeatTask(void *pData)
{
pRemoteOBJ self = (pRemoteOBJ)pData;
int status;
char command[] = {"contextdo 8437 Poch\r\n"};
if (time(NULL) > self->nextHeartbeat){
status = ANETwrite(self->handle,command, strlen(command));
if(status != 1){
traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port);
self->connected = 0;
ConnectRemoteObject(self);
if(!self->connected){
MarkDisconnected(self);
}
}
self->nextHeartbeat = time(NULL) + 10;
}
return 1;
}
/*============================= writing related code ===========================
This works by sending the command via contextdo with a ID > 10^6. This causes
the remote SICS to send the termination messages. The transaction IDs together
with the connection responsible for it are kept in a list.
This list is used by the write task to forward messages properly and for handling
termination.
-----------------------------------------------------------------------------------*/
#include <outcode.c>
static OutCode findOutCode(char *txt)
{
int i;
for(i = 0; i < iNoCodes; i++){
if(strstr(txt,pCode[i]) != NULL){
return i;
}
}
return eValue;
}
/*-----------------------------------------------------------------------------------*/
static int WriteResponseTask(void *pData)
{
pRemoteOBJ self = (pRemoteOBJ)pData;
int status, length = 0, transID;
char *pText, *outTxt;
json_object *message = NULL, *data = NULL;
enum json_tokener_error tokerr;
OutCode eOut;
writeData WD;
if(!ANETvalidHandle(self->handle)) {
return 1;
}
pText = ANETreadPtr(self->handle,&length);
while(length > 0){
json_tokener_reset(self->jtok);
message = json_tokener_parse_ex(self->jtok,pText,length);
tokerr = self->jtok->err;
if(tokerr == json_tokener_continue){
return 1;
} else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s %d",
json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset);
ANETreadConsume(self->handle,length);
return 1;
}
if(json_object_get_type(message) != json_type_object) {
traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host);
ANETreadConsume(self->handle,length);
return 1;
}
/*
we need to consume here what has been parsed.
The char_offset in the tokenizer structure might tell us that...
*/
ANETreadConsume(self->handle,self->jtok->char_offset);
/*
Received a valid message, process
*/
data = json_object_object_get(message,"trans");
if(data == NULL){
traceIO("RO","No transaction ID found in %s from %s", pText,self->host);
return 1;
}
transID = json_object_get_int(data);
data = json_object_object_get(message,"flag");
if(data == NULL){
traceIO("RO","No flag found in %s from %s", pText,self->host);
return 1;
}
outTxt = (char *)json_object_get_string(data);
eOut = findOutCode(outTxt);
data = json_object_object_get(message,"data");
if(data == NULL){
traceIO("RO","No data found in %s from %s", pText,self->host);
return 1;
}
pText = (char *)json_object_get_string(data);
traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText);
/*
do nothing on Poch
*/
if(transID == POCHACT){
pText = ANETreadPtr(self->handle,&length);
continue;
}
/*
process update messages
*/
if(transID == READACT){
InterpExecute(pServ->pSics,pServ->dummyCon,pText);
traceIO("RO","Received %s from remote",pText);
pText = ANETreadPtr(self->handle,&length);
continue;
}
status = LLDnodePtr2First(self->writeList);
while(status == 1){
LLDblobData(self->writeList,&WD);
if(WD.transID == transID){
if(strstr(pText,"COMSTART") != NULL){
/* skip */
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) {
SCDeleteConnection(WD.pCon);
LLDblobDelete(self->writeList);
return 1;
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) {
/* skip */
return 1;
} else if(strstr(pText,"TASKSTART") != NULL){
WD.waitTask = 1 ;
LLDblobDelete(self->writeList);
LLDblobAppend(self->writeList,&WD, sizeof(writeData));
return 1;
} else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){
SCDeleteConnection(WD.pCon);
LLDblobDelete(self->writeList);
return 1;
} else {
SCWrite(WD.pCon,pText,eOut);
return 1;
}
}
status = LLDnodePtr2Next(self->writeList);
}
pText = ANETreadPtr(self->handle,&length);
}
return 1;
}
/*---------------------------------------------------------------------------------*/
static int IncrementTransactionID()
{
transactionID++;
if(transactionID >= 200000){
transactionID = 100000;
}
return transactionID;
}
/*---------------------------------------------------------------------------------*/
static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
pHdbMessage mes)
{
pHdbDataMessage mm = NULL;
int handle, status, length, newHandle = 0;
pRemoteOBJ self = (pRemoteOBJ)userData;
SConnection *pCon = NULL;
pDynString data;
char *remoteNode;
char *command, *answer, *pEnd;
writeData WD;
if((mm = GetHdbSetMessage(mes)) != NULL){
pCon = (SConnection *)mm->callData;
/*
build the command to send
*/
data = formatValue(*(mm->v),currentNode);
remoteNode = GetHdbProp(currentNode,"remotewrite");
length = 40 + strlen(remoteNode) + GetDynStringLength(data);
command = malloc(length*sizeof(char));
if(command == NULL){
if(pCon != NULL){
SCWrite(pCon,"ERROR: out of memory writing remote node",eError);
}
return hdbAbort;
}
WD.pCon = SCCopyConnection(pCon);
WD.waitTask = 0;
WD.transID = IncrementTransactionID();
snprintf(command,length,"contextdo %d hset %s %s\r\n",
WD.transID, remoteNode, GetCharArray(data));
/*
write
*/
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
LLDblobAppend(self->writeList,&WD,sizeof(writeData));
status = ANETwrite(self->handle,command,strlen(command));
free(command);
DeleteDynString(data);
if(status < 0){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host);
}
return hdbAbort;
}
return hdbContinue;
}
return hdbContinue;
}
/*------------------------------------------------------------------------------*/
static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd)
{
pHdb localNode = NULL;
char command[1024], reply[1024], *pPtr;
int status, type;
localNode = FindHdbNode(NULL,rd.localNode, pCon);
if(localNode == NULL){
SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode);
return 0;
}
pPtr = GetHdbProp(localNode,"remotewrite");
if(pPtr != NULL){
SCPrintf(pCon,eError,"ERROR: %s alread connected to %s", rd.localNode,
rd.remoteNode);
return 0;
}
SetHdbProperty(localNode,"remotewrite",rd.remoteNode);
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback,
self,NULL));
/*
* Get information about the remote node and check compatability
*/
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host);
MarkDisconnected(self);
return 0;
}
if(strstr(reply, "ERROR") != NULL){
SCPrintf(pCon,eError,"%s while trying to contact remote node %s",
reply, rd.remoteNode);
return 0;
}
/* only interested in type: answer is of style: type,nochildren,length */
pPtr = strchr(reply,',');
*pPtr= '\0';
type = convertHdbType(reply);
if(type != localNode->value.dataType){
SCPrintf(pCon,eError,
"ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d",
rd.localNode, rd.remoteNode, localNode->value.dataType, type);
return 0;
}
return 1;
}
/*---------------------------------------------------------------------------------*/
static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
ReadData rd;
int status;
pRemoteOBJ self;
if(nPar < 2) {
SCWrite(pCon,"ERROR: need path to local node and remote node for connectwrite",
eError);
return 0;
}
/*
* Initialize....
*/
strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode));
strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode));
self = (pRemoteOBJ)ccmd->pPrivate;
status = ConnectWrite(self,pCon,rd);
if(status == 1){
SCSendOK(pCon);
}
return status;
}
/*============================ remote execute =================================*/
static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command)
{
int status;
char answer[65536];
/*
write, thereby taking care to prefix with transact and for proper termination
*/
memset(answer,0,sizeof(answer)-1);
status = transactCommand(self->transactHandle,command,answer,sizeof(answer));
if(status){
SCWrite(pCon,answer,eValue);
} else {
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
return status;
}
/*------------------------------------------------------------------------------*/
static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
int status, i;
char *pPtr;
Tcl_DString com;
pDynString val;
pRemoteOBJ self;
self = (pRemoteOBJ)ccmd->pPrivate;
Tcl_DStringInit(&com);
for (i = 0; i < nPar; i++) {
val = formatValue(par[i]->value, par[i]);
if (val != NULL) {
Tcl_DStringAppend(&com, " ", 1);
pPtr = GetCharArray(val);
Tcl_DStringAppend(&com, pPtr, strlen(pPtr));
DeleteDynString(val);
}
}
status = RemoteExecute(self,pCon,Tcl_DStringValue(&com));
Tcl_DStringFree(&com);
return status;
}
/*============================= connect command ================================*/
static int ConnectCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
pRemoteOBJ self;
self = (pRemoteOBJ)ccmd->pPrivate;
ConnectRemoteObject(self);
if(self->connected){
SCSendOK(pCon);
} else {
SCPrintf(pCon,eError,"ERROR: failed to connect to %s %d", self->host, self->port);
return 0;
}
return 1;
}
/*============================ object initialisation etc =======================*/
static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[])
{
pSICSOBJ pNew = NULL;
pRemoteOBJ self = NULL;
int status;
pHdb cmd;
char roTaskName[256];
if(argc < 4) {
SCWrite(pCon,"ERROR: need name and remote host name and port in order to create remote object",
eError);
return 0;
}
strtolower(argv[1]);
if(FindCommand(pSics,argv[1]) != NULL){
SCPrintf(pCon,eError, "ERROR: command %s already exists!", argv[1]);
return 0;
}
pNew = MakeSICSOBJ(argv[1],"RemoteOBJ");
self = calloc(1, sizeof(RemoteOBJ));
if(pNew == NULL || self == NULL){
SCWrite(pCon,"ERROR: out of memory creating remote object", eError);
return 0;
}
pNew->pPrivate = self;
pNew->KillPrivate = KillRemoteOBJ;
self->host = strdup(argv[2]);
self->port = atoi(argv[3]);
self->readList = LLDblobCreate();
self->writeList = LLDblobCreate();
self->jtok = json_tokener_new();
ConnectRemoteObject(self);
cmd = AddSICSHdbPar(pNew->objectNode,
"connectread", usMugger, MakeSICSFunc(ConnectreadCmd));
AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText(""));
AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText(""));
cmd = AddSICSHdbPar(pNew->objectNode,
"connectwrite", usMugger, MakeSICSFunc(ConnectwriteCmd));
AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText(""));
AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText(""));
cmd = AddSICSHdbPar(pNew->objectNode,
"exe", usMugger, MakeSICSFunc(RemoteExecuteCmd));
AddSICSHdbPar(cmd, "args", usMugger, MakeHdbText(""));
cmd = AddSICSHdbPar(pNew->objectNode,
"connect", usMugger, MakeSICSFunc(ConnectCmd));
status = AddCommand(pSics,
argv[1],
InterInvokeSICSOBJ,
KillSICSOBJ, pNew);
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1);
return status;
}
/*----------------------------------------------------------------------------------------*/
static int AddRemoteCallback(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[])
{
pHdb localNode = NULL;
pUpdateCallback up = NULL;
if(argc < 3) {
SCWrite(pCon,"ERROR: need path to local node and remote node for updatecb",
eError);
return 0;
}
localNode = FindHdbNode(NULL,argv[1], pCon);
if(localNode == NULL){
SCPrintf(pCon,eError,"ERROR: local node %s not found", argv[1]);
return 0;
}
up = malloc(sizeof(UpdateCallback));
if(up == NULL){
SCWrite(pCon,"ERROR: out of memory installing update callback",eError);
return 0;
}
up->sendCon = SCCopyConnection(pCon);
up->remotePath = strdup(argv[2]);
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROUpdateCallback,
up,KillUpdateStruct));
/**
* This is meant to send an update immediatly such that the remote node
* is updated right away,
*/
NotifyHipadabaPar(localNode, NULL);
SCSendOK(pCon);
return 1;
}
/*----------------------------------------------------------------------------------------*/
void RemoteObjectInit(void)
{
AddCommand(pServ->pSics,
"makeremo",
MakeRemoteObject,
NULL,NULL);
AddCommand(pServ->pSics,
"addremotecb",
AddRemoteCallback,
NULL,NULL);
}