Files
sics/remoteobject.c
2017-01-13 15:43:39 +01:00

974 lines
28 KiB
C

/**
* Remote objects in sicsobj. This means accessing remote objects in a different
* SICS server from a master SICS server.
*
* Reading is implemented according to this scheme:
*
* * When a read connection is made between a local node and a remote node in slave, then a
* callback is installed on remote node in slave.
* * The callback on remote in slave sends commands to update the local node when either the
* value or the geterror property changes on the remote node. The commands sent are
* enclosed in special delimiters
* * A special ANET callback evaluates the data coming from slave and acts accordingly, thus
* updating the local node. This is driven by the general network driving code of SICS
*
* * in order to detect availability and re-availability of slave a Heartbeat Task sends a
* heartbeat message to slave. Thereby testing the connection regularly, trying to reconnect etc.
*
* COPRYRIGHT: see file COPYRIGHT
*
* Mark Koennecke, February-May 2015
**/
#include <unistd.h>
#include <time.h>
#include <tcl.h>
#include <mxml.h>
#include <sics.h>
#include <sicsobj.h>
#include <sicshipadaba.h>
#include <asynnet.h>
#include <lld.h>
#include <lld_blob.h>
#include <dynstring.h>
#include <stptok.h>
#include <json-c/json.h>
#define OOM -5001 /* out of memory */
#define TO -5002 /* timeout */
#define READACT 7654
#define POCHACT 8437
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
extern char *trim(char *txt);
static int transactionID = 100000;
/*---------------------- our very private data structure -------------------*/
typedef struct {
char *host;
int port;
int handle;
int transactHandle;
int readList;
int writeList;
unsigned int connected;
time_t nextHeartbeat;
struct json_tokener *jtok;
} RemoteOBJ, *pRemoteOBJ;
/*----------------------------------------------------------------------------*/
typedef struct {
char localNode[1024];
char remoteNode[1024];
} ReadData, *pReadData;
/*---------------------------------------------------------------------------*/
typedef struct {
SConnection *sendCon;
char *remotePath;
} UpdateCallback, *pUpdateCallback;
/*----------------------------------------------------------------------------*/
typedef struct {
int transID;
SConnection *pCon;
int waitTask;
}writeData, *pWriteData;
/*----------------------------------------------------------------------------*/
void KillRemoteOBJ(void *data)
{
char roTaskName[132];
pRemoteOBJ self = (pRemoteOBJ) data;
if(data != NULL){
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
StopTask(pServ->pTasker,roTaskName);
free(self->host);
ANETclose(self->handle);
ANETclose(self->transactHandle);
LLDdeleteBlob(self->readList);
LLDdeleteBlob(self->writeList);
json_tokener_free(self->jtok);
}
}
/*-----------------------------------------------------------------------------*/
static int transactCommand(int handle, char *command, char *reply, int replyLen)
{
char *toSend = NULL;
char *prefix = {"transact "};
int status, length, type;
time_t start;
char *pPtr, *pEnd;
/*
* read possible dirt of the line
*/
pPtr = ANETreadPtr(handle,&length);
ANETreadConsume(handle,length);
toSend = malloc(strlen(command) + strlen(prefix) + 10);
if(toSend == NULL){
return OOM;
}
strcpy(toSend, prefix);
strcat(toSend, command);
if(strstr(command,"\n") == NULL){
strcat(toSend,"\r\n");
}
status = ANETwrite(handle,toSend,strlen(toSend));
free(toSend);
if(status != 1){
return status;
}
/*
* wait for a reply for max 2 seconds
*/
start = time(NULL);
while(time(NULL) < start + 2.0){
ANETprocess();
pPtr = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){
*pEnd = '\0';
strncpy(reply,pPtr,replyLen);
ANETreadConsume(handle,length);
return 1;
}
usleep(100);
}
/*
* here we have run into a timeout
*/
ANETreadConsume(handle,length);
return TO;
}
/*----------------------------------------------------------------------------*/
static void ConnectRemoteObject(pRemoteOBJ self)
{
char *pPtr, command[1024];
int length, status;
ReadData rd;
pHdb node;
if(self->connected){
return;
}
self->handle = ANETconnect(self->host, self->port);
self->transactHandle = ANETconnect(self->host, self->port);
if(self->handle < 0 || self->transactHandle < 0){
self->connected = 0;
traceIO("RO","Failed to connect to remote objects at %s, port %d",
self->host, self->port);
return;
}
traceIO("RO","Connected to %s, port %d for remote objects",
self->host, self->port);
/*
Default login with hard coded manager login. Defined in
nserver.c
*/
ANETwrite(self->handle,login,strlen(login));
ANETwrite(self->transactHandle,login,strlen(login));
usleep(500);
ANETprocess();
/*
eat the login responses
*/
pPtr = ANETreadPtr(self->handle, &length);
ANETreadConsume(self->handle,length);
pPtr = ANETreadPtr(self->transactHandle, &length);
ANETreadConsume(self->transactHandle,length);
transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1);
/*
* Remove geterror on read nodes and reinstall callbacks for reconnects
*/
status = LLDnodePtr2First(self->readList);
while(status != 0) {
LLDblobData(self->readList,&rd);
node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){
SetHdbProperty(node,"geterror",NULL);
snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->handle,command,strlen(command));
}
status = LLDnodePtr2Next(self->readList);
}
self->connected = 1;
}
/*-----------------------------------------------------------------------------*/
static void MarkDisconnected(pRemoteOBJ self)
{
int status;
ReadData rd;
pHdb node;
status = LLDnodePtr2First(self->readList);
while(status != 0) {
LLDblobData(self->readList,&rd);
node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){
SetHdbProperty(node,"geterror","Disconnected from remote server");
}
status = LLDnodePtr2Next(self->readList);
}
self->connected = 0;
}
/*-----------------------------------------------------------------------------*/
static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
pHdbMessage mes)
{
pUpdateCallback uppi = (pUpdateCallback)userData;
hdbDataMessage *mm = NULL;
pDynString text;
char *prefix = {"hupdate "};
char *postfix= {" \r\n"};
char *txt = NULL;
int length;
pHdbPropertyChange propChange = NULL;
mm = GetHdbUpdateMessage(mes);
if(mm != NULL){
/*
* remove myself when the connection is dead...
*/
if(!SCisConnected(uppi->sendCon)){
return hdbKill;
}
/*
* format and send the update command to master
*/
text = formatValue(*(mm->v), currentNode);
length = GetDynStringLength(text) +
strlen(prefix) + strlen(postfix) + strlen(uppi->remotePath) +5;
txt = malloc(length*sizeof(char));
if(txt == NULL){
return hdbContinue;
}
snprintf(txt,length,"%s %s %s %s", prefix, uppi->remotePath,
GetCharArray(text), postfix);
SCWrite(uppi->sendCon,txt,eValue);
free(txt);
DeleteDynString(text);
}
propChange = GetPropertyChangeMessage(mes);
if(propChange != NULL){
/*
* remove myself when the connection is dead...
*/
if(!SCisConnected(uppi->sendCon)){
return hdbKill;
}
length = strlen("hdelprop ") + strlen(uppi->remotePath) +
strlen(propChange->key) + 10;
if(propChange->value != NULL){
length += strlen(propChange->value);
}
txt = malloc(length*sizeof(char));
if(txt == NULL){
return hdbContinue;
}
if(propChange->value == NULL){
snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath,
propChange->key,postfix);
} else {
snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath,
propChange->key,propChange->value, postfix);
}
SCWrite(uppi->sendCon,txt,eValue);
free(txt);
}
return hdbContinue;
}
/*-----------------------------------------------------------------------------*/
static hdbCallbackReturn GetErrorCallback(pHdb currentNode, void *userData,
pHdbMessage mes)
{
hdbDataMessage *mm= NULL;
char *geterror, error[512];
SConnection *con = NULL;
mm = GetHdbGetMessage(mes);
if (mm != NULL) {
con = mm->callData;
geterror = GetHdbProp(currentNode, "geterror");
if (geterror != NULL) {
snprintf(error,sizeof(error),"ERROR: %s", geterror);
SCWrite(con, error, eError);
if (mm->v->dataType == HIPTEXT) {
if (mm->v->v.text != NULL) {
free(mm->v->v.text);
}
mm->v->v.text = strdup(error);
}
return hdbAbort;
}
}
return hdbContinue;
}
/*-----------------------------------------------------------------------------*/
static void KillUpdateStruct(void *data)
{
pUpdateCallback self = (pUpdateCallback)data;
if(data != NULL){
SCDeleteConnection(self->sendCon);
free(self->remotePath);
free(self);
}
}
/*-----------------------------------------------------------------------------*/
static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
{
char command[1024], reply[1024], *pPtr;
int status, type;
pHdb localNode = NULL;
/*
* Initialize....
*/
localNode = FindHdbNode(NULL,rd.localNode, pCon);
if(localNode == NULL){
SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode);
return 0;
}
/**
* Refuse duplicate connections
*/
pPtr = GetHdbProp(localNode,"remoteread");
if(pPtr != NULL){
SCPrintf(pCon,eError,"ERROR: %s is already connected to %s", rd.localNode, pPtr);
return 0;
}
/*
* Get information about the remote node and check compatability
*/
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
/*
* try a reconnect,
* when fails:
* Warning
* add blob
*/
self->connected = 0;
ConnectRemoteObject(self);
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host);
MarkDisconnected(self);
LLDblobAdd(self->readList,&rd,sizeof(rd));
AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback,
NULL,NULL));
SetHdbProperty(localNode,"remoteread",rd.remoteNode);
return 1;
}
}
if(strstr(reply, "ERROR") != NULL){
SCPrintf(pCon,eError,"%s while trying to contact remote node %s",
reply, rd.remoteNode);
return 0;
}
/* only interested in type: answer is of style: type,nochildren,length */
pPtr = strchr(reply,',');
*pPtr= '\0';
type = convertHdbType(reply);
if(type != localNode->value.dataType){
SCPrintf(pCon,eError,
"ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d",
rd.localNode, rd.remoteNode, localNode->value.dataType, type);
return 0;
}
/*
* Make an entry in the read list
*/
LLDblobAdd(self->readList,&rd,sizeof(rd));
AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback,
NULL,NULL));
SetHdbProperty(localNode,"remoteread",rd.remoteNode);
/*
* Install a callback on the remote node to update the master. The remote should
* then immediatly send an update which will be processed by the read callback.
*/
snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->handle,command,strlen(command));
return 1;
}
/*-----------------------------------------------------------------------------*/
static int ConnectreadCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
ReadData rd;
pHdb localNode = NULL;
char command[1024], reply[1024], *pPtr;
int status, type;
pRemoteOBJ self;
if(nPar < 2) {
SCWrite(pCon,"ERROR: need path to local node and remote node for connectread",
eError);
return 0;
}
/*
* Initialize....
*/
strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode));
strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode));
self = (pRemoteOBJ)ccmd->pPrivate;
status = ConnectRead(self,pCon,rd);
if(status == 1){
SCSendOK(pCon);
}
return status;
}
/*-----------------------------------------------------------------------------*/
static int HeartbeatTask(void *pData)
{
pRemoteOBJ self = (pRemoteOBJ)pData;
int status;
char command[] = {"contextdo 8437 Poch\r\n"};
if (time(NULL) > self->nextHeartbeat){
status = ANETwrite(self->handle,command, strlen(command));
if(status != 1){
traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port);
self->connected = 0;
ConnectRemoteObject(self);
if(!self->connected){
MarkDisconnected(self);
}
}
self->nextHeartbeat = time(NULL) + 10;
}
return 1;
}
/*============================= writing related code ===========================
This works by sending the command via contextdo with a ID > 10^6. This causes
the remote SICS to send the termination messages. The transaction IDs together
with the connection responsible for it are kept in a list.
This list is used by the write task to forward messages properly and for handling
termination.
-----------------------------------------------------------------------------------*/
#include <outcode.h>
static OutCode findOutCode(char *txt)
{
int i;
for(i = 0; i < iNoCodes; i++){
if(strstr(txt,pCode[i]) != NULL){
return i;
}
}
return eValue;
}
/*-----------------------------------------------------------------------------------*/
static void CheckWriteList(int writeList,int transID, OutCode eOut, char *pText)
{
int status;
writeData WD;
status = LLDnodePtr2First(writeList);
while(status == 1){
LLDblobData(writeList,&WD);
if(WD.transID == transID){
if(strstr(pText,"COMSTART") != NULL){
/* skip */
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) {
SCDeleteConnection(WD.pCon);
LLDblobDelete(writeList);
return;
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) {
/* skip */
return;
} else if(strstr(pText,"TASKSTART") != NULL){
WD.waitTask = 1 ;
LLDblobDelete(writeList);
LLDblobAppend(writeList,&WD, sizeof(writeData));
return;
} else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){
SCDeleteConnection(WD.pCon);
LLDblobDelete(writeList);
return;
} else {
if(strstr(pText,"OK") == NULL){
SCWrite(WD.pCon,pText,eOut);
}
return;
}
}
status = LLDnodePtr2Next(writeList);
}
}
/*-----------------------------------------------------------------------------------*/
static int WriteResponseTask(void *pData)
{
pRemoteOBJ self = (pRemoteOBJ)pData;
int status, length = 0, transID;
char *pText, *outTxt;
json_object *message = NULL, *data = NULL;
enum json_tokener_error tokerr;
OutCode eOut;
writeData WD;
if(!ANETvalidHandle(self->handle)) {
return 1;
}
pText = ANETreadPtr(self->handle,&length);
while(length > 0){
json_tokener_reset(self->jtok);
message = json_tokener_parse_ex(self->jtok,pText,length);
tokerr = self->jtok->err;
if(tokerr == json_tokener_continue){
return 1;
} else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s %d",
json_tokener_error_desc(tokerr), pText, self->host, self->jtok->char_offset);
ANETreadConsume(self->handle,length);
return 1;
}
if(json_object_get_type(message) != json_type_object) {
traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host);
ANETreadConsume(self->handle,length);
return 1;
}
/*
we need to consume here what has been parsed.
The char_offset in the tokenizer structure might tell us that...
*/
ANETreadConsume(self->handle,self->jtok->char_offset);
/*
Received a valid message, process
*/
data = json_object_object_get(message,"trans");
if(data == NULL){
traceIO("RO","No transaction ID found in %s from %s", pText,self->host);
return 1;
}
transID = json_object_get_int(data);
data = json_object_object_get(message,"flag");
if(data == NULL){
traceIO("RO","No flag found in %s from %s", pText,self->host);
return 1;
}
outTxt = (char *)json_object_get_string(data);
eOut = findOutCode(outTxt);
data = json_object_object_get(message,"data");
if(data == NULL){
traceIO("RO","No data found in %s from %s", pText,self->host);
return 1;
}
pText = (char *)json_object_get_string(data);
traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText);
/*
do nothing on Poch
*/
if(transID == POCHACT){
pText = ANETreadPtr(self->handle,&length);
json_object_put(message);
continue;
}
/*
process update messages
*/
if(transID == READACT){
if(strstr(pText,"hupdate") != NULL || strstr(pText,"prop") != NULL){
InterpExecute(pServ->pSics,pServ->dummyCon,pText);
}
traceIO("RO","Received %s from remote",pText);
pText = ANETreadPtr(self->handle,&length);
json_object_put(message);
continue;
}
/*
check write List
*/
CheckWriteList(self->writeList,transID,eOut,pText);
json_object_put(message);
pText = ANETreadPtr(self->handle,&length);
}
return 1;
}
/*---------------------------------------------------------------------------------*/
static int IncrementTransactionID()
{
transactionID++;
if(transactionID >= 200000){
transactionID = 100000;
}
return transactionID;
}
/*---------------------------------------------------------------------------------*/
static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
pHdbMessage mes)
{
pHdbDataMessage mm = NULL;
int handle, status, length, newHandle = 0;
pRemoteOBJ self = (pRemoteOBJ)userData;
SConnection *pCon = NULL;
pDynString data;
char *remoteNode;
char *command, *answer, *pEnd;
writeData WD;
if((mm = GetHdbSetMessage(mes)) != NULL){
pCon = (SConnection *)mm->callData;
/*
build the command to send
*/
data = formatValue(*(mm->v),currentNode);
remoteNode = GetHdbProp(currentNode,"remotewrite");
length = 40 + strlen(remoteNode) + GetDynStringLength(data);
command = malloc(length*sizeof(char));
if(command == NULL){
if(pCon != NULL){
SCWrite(pCon,"ERROR: out of memory writing remote node",eError);
}
return hdbAbort;
}
WD.pCon = SCCopyConnection(pCon);
WD.waitTask = 0;
WD.transID = IncrementTransactionID();
snprintf(command,length,"contextdo %d hset %s %s\r\n",
WD.transID, remoteNode, GetCharArray(data));
/*
write
*/
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
LLDblobAppend(self->writeList,&WD,sizeof(writeData));
status = ANETwrite(self->handle,command,strlen(command));
free(command);
DeleteDynString(data);
if(status < 0){
self->connected = 0;
ConnectRemoteObject(self);
if(self->connected == 0){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected",
remoteNode, self->host);
}
return hdbAbort;
}
}
return hdbContinue;
}
return hdbContinue;
}
/*------------------------------------------------------------------------------*/
static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd)
{
pHdb localNode = NULL;
char command[1024], reply[1024], *pPtr;
int status, type;
localNode = FindHdbNode(NULL,rd.localNode, pCon);
if(localNode == NULL){
SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode);
return 0;
}
pPtr = GetHdbProp(localNode,"remotewrite");
if(pPtr != NULL){
SCPrintf(pCon,eError,"ERROR: %s alread connected to %s", rd.localNode,
rd.remoteNode);
return 0;
}
SetHdbProperty(localNode,"remotewrite",rd.remoteNode);
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback,
self,NULL));
/*
TODO: The connected write nodes should be held in a list in order to be able to
remove the write callbacks when deleting the remote object. As removing remote
objects usually only happens when SICS shuts down this is not so important.
*/
/*
* Get information about the remote node and check compatability
*/
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host);
MarkDisconnected(self);
return 0;
}
if(strstr(reply, "ERROR") != NULL){
SCPrintf(pCon,eError,"%s while trying to contact remote node %s",
reply, rd.remoteNode);
return 0;
}
/* only interested in type: answer is of style: type,nochildren,length */
pPtr = strchr(reply,',');
*pPtr= '\0';
type = convertHdbType(reply);
if(type != localNode->value.dataType){
SCPrintf(pCon,eError,
"ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d",
rd.localNode, rd.remoteNode, localNode->value.dataType, type);
return 0;
}
return 1;
}
/*---------------------------------------------------------------------------------*/
static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
ReadData rd;
int status;
pRemoteOBJ self;
if(nPar < 2) {
SCWrite(pCon,"ERROR: need path to local node and remote node for connectwrite",
eError);
return 0;
}
/*
* Initialize....
*/
strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode));
strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode));
self = (pRemoteOBJ)ccmd->pPrivate;
status = ConnectWrite(self,pCon,rd);
if(status == 1){
SCSendOK(pCon);
}
return status;
}
/*============================ remote execute =================================*/
static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command)
{
int status;
char answer[65536];
/*
write, thereby taking care to prefix with transact and for proper termination
*/
memset(answer,0,sizeof(answer)-1);
status = transactCommand(self->transactHandle,command,answer,sizeof(answer));
if(status == 1){
SCWrite(pCon,answer,eValue);
} else {
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
return status;
}
/*------------------------------------------------------------------------------*/
static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
int status, i;
char *pPtr;
Tcl_DString com;
pDynString val;
pRemoteOBJ self;
self = (pRemoteOBJ)ccmd->pPrivate;
Tcl_DStringInit(&com);
for (i = 0; i < nPar; i++) {
val = formatValue(par[i]->value, par[i]);
if (val != NULL) {
Tcl_DStringAppend(&com, " ", 1);
pPtr = GetCharArray(val);
Tcl_DStringAppend(&com, pPtr, strlen(pPtr));
DeleteDynString(val);
}
}
status = RemoteExecute(self,pCon,Tcl_DStringValue(&com));
Tcl_DStringFree(&com);
return status;
}
/*============================= connect command ================================*/
static int ConnectCmd(pSICSOBJ ccmd, SConnection * pCon,
Hdb * cmdNode, Hdb * par[], int nPar)
{
pRemoteOBJ self;
self = (pRemoteOBJ)ccmd->pPrivate;
ConnectRemoteObject(self);
if(self->connected){
SCSendOK(pCon);
} else {
SCPrintf(pCon,eError,"ERROR: failed to connect to %s %d", self->host, self->port);
return 0;
}
return 1;
}
/*============================ object initialisation etc =======================*/
static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[])
{
pSICSOBJ pNew = NULL;
pRemoteOBJ self = NULL;
int status;
pHdb cmd;
char roTaskName[256];
if(argc < 4) {
SCWrite(pCon,"ERROR: need name and remote host name and port in order to create remote object",
eError);
return 0;
}
strtolower(argv[1]);
if(FindCommand(pSics,argv[1]) != NULL){
SCPrintf(pCon,eError, "ERROR: command %s already exists!", argv[1]);
return 0;
}
pNew = MakeSICSOBJ(argv[1],"RemoteOBJ");
self = calloc(1, sizeof(RemoteOBJ));
if(pNew == NULL || self == NULL){
SCWrite(pCon,"ERROR: out of memory creating remote object", eError);
return 0;
}
pNew->pPrivate = self;
pNew->KillPrivate = KillRemoteOBJ;
self->host = strdup(argv[2]);
self->port = atoi(argv[3]);
self->readList = LLDblobCreate();
self->writeList = LLDblobCreate();
self->jtok = json_tokener_new();
ConnectRemoteObject(self);
cmd = AddSICSHdbPar(pNew->objectNode,
"connectread", usMugger, MakeSICSFunc(ConnectreadCmd));
AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText(""));
AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText(""));
cmd = AddSICSHdbPar(pNew->objectNode,
"connectwrite", usMugger, MakeSICSFunc(ConnectwriteCmd));
AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText(""));
AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText(""));
cmd = AddSICSHdbPar(pNew->objectNode,
"exe", usMugger, MakeSICSFunc(RemoteExecuteCmd));
AddSICSHdbPar(cmd, "args", usMugger, MakeHdbText(""));
cmd = AddSICSHdbPar(pNew->objectNode,
"connect", usMugger, MakeSICSFunc(ConnectCmd));
status = AddCommand(pSics,
argv[1],
InterInvokeSICSOBJ,
KillSICSOBJ, pNew);
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1);
return status;
}
/*----------------------------------------------------------------------------------------*/
static int AddRemoteCallback(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[])
{
pHdb localNode = NULL;
pUpdateCallback up = NULL;
if(argc < 3) {
SCWrite(pCon,"ERROR: need path to local node and remote node for updatecb",
eError);
return 0;
}
localNode = FindHdbNode(NULL,argv[1], pCon);
if(localNode == NULL){
SCPrintf(pCon,eError,"ERROR: local node %s not found", argv[1]);
return 0;
}
up = malloc(sizeof(UpdateCallback));
if(up == NULL){
SCWrite(pCon,"ERROR: out of memory installing update callback",eError);
return 0;
}
up->sendCon = SCCopyConnection(pCon);
up->remotePath = strdup(argv[2]);
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROUpdateCallback,
up,KillUpdateStruct));
/**
* This is meant to send an update immediatly such that the remote node
* is updated right away,
*/
NotifyHipadabaPar(localNode, NULL);
SCSendOK(pCon);
return 1;
}
/*----------------------------------------------------------------------------------------*/
void RemoteObjectInit(void)
{
AddCommand(pServ->pSics,
"makeremo",
MakeRemoteObject,
NULL,NULL);
AddCommand(pServ->pSics,
"addremotecb",
AddRemoteCallback,
NULL,NULL);
}