sub problems were solved: - sicsget was not reporting geterrors on nodes properly - rwpuffer contained dirt after wrap - property change events were added to hipadaba - Some tuning of SICS output - The number of codes was wrong in outcode.c
1009 lines
28 KiB
C
1009 lines
28 KiB
C
/**
|
|
* Remote objects in sicsobj. This means accessing remote objects in a different
|
|
* SICS server from a master SICS server.
|
|
*
|
|
* Reading is implementd according to this scheme:
|
|
*
|
|
* * When a read connection is made between a local node and a remote node in slave, then a
|
|
* callback is installed on remote node in slave.
|
|
* * The callback on remote in slave sends commands to update the local node when either the
|
|
* value or the geterror property changes on the remote node. The commands sent are
|
|
* enclosed in special delimiters
|
|
* * A special ANET callback evaluates the data coming from slave and acts accordingly, thus
|
|
* updating the local node. This is driven by the general network driving code of SICS
|
|
*
|
|
* * in order to detect availability and re-availability of slave a Heartbeat Task sends a
|
|
* heartbeat message to slave. Thereby testing the connection regularly, trying to reconnect etc.
|
|
*
|
|
* COPRYRIGHT: see file COPYRIGHT
|
|
*
|
|
* Mark Koennecke, February 2015
|
|
**/
|
|
#include <unistd.h>
|
|
#include <time.h>
|
|
#include <tcl.h>
|
|
#include <mxml.h>
|
|
#include <sics.h>
|
|
#include <sicsobj.h>
|
|
#include <sicshipadaba.h>
|
|
#include <asynnet.h>
|
|
#include <lld.h>
|
|
#include <lld_blob.h>
|
|
#include <dynstring.h>
|
|
#include <stptok.h>
|
|
|
|
#define OOM -5001 /* out of memory */
|
|
#define TO -5002 /* timeout */
|
|
|
|
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
|
|
extern char *trim(char *txt);
|
|
/*---------------------- our very private data structure -------------------*/
|
|
typedef struct {
|
|
char *host;
|
|
int port;
|
|
int readHandle;
|
|
int writeHandle;
|
|
int writeInUse;
|
|
int readList;
|
|
unsigned int connected;
|
|
time_t nextHeartbeat;
|
|
} RemoteOBJ, *pRemoteOBJ;
|
|
/*----------------------------------------------------------------------------*/
|
|
typedef struct {
|
|
char localNode[1024];
|
|
char remoteNode[1024];
|
|
} ReadData, *pReadData;
|
|
/*---------------------------------------------------------------------------*/
|
|
typedef struct {
|
|
SConnection *sendCon;
|
|
char *remotePath;
|
|
} UpdateCallback, *pUpdateCallback;
|
|
/*----------------------------------------------------------------------------*/
|
|
void KillRemoteOBJ(void *data)
|
|
{
|
|
char roTaskName[132];
|
|
|
|
pRemoteOBJ self = (pRemoteOBJ) data;
|
|
if(data != NULL){
|
|
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
|
|
StopTask(pServ->pTasker,roTaskName);
|
|
free(self->host);
|
|
ANETclose(self->readHandle);
|
|
ANETclose(self->writeHandle);
|
|
LLDdeleteBlob(self->readList);
|
|
}
|
|
}
|
|
/*========================= reading related code ================================*/
|
|
static int RemoteReadCallback(int handle, void *userData)
|
|
{
|
|
int length;
|
|
char *pPtr, *pStart, *pEnd;
|
|
|
|
pPtr = ANETreadPtr(handle,&length);
|
|
|
|
/*
|
|
* deal with command results
|
|
*/
|
|
pStart = strstr(pPtr, "TRANSACTIONSTART");
|
|
pEnd = strstr(pPtr,"TRANSACTIONEND");
|
|
if(pStart != NULL && pEnd != NULL){
|
|
pStart = pStart + strlen("TRANSACTIONSTART");
|
|
*pEnd = '\0';
|
|
traceIO("RO","Received command - reply: %s", pStart);
|
|
pEnd += strlen("TRANSACTIONEND");
|
|
ANETreadConsume(handle,pEnd - pPtr);
|
|
}
|
|
|
|
/*
|
|
* deal with update messages
|
|
*/
|
|
pStart = strstr(pPtr, "SROC:");
|
|
pEnd = strstr(pPtr,":EROC\r\n");
|
|
if(pStart != NULL && pEnd != NULL){
|
|
pStart += strlen("SROC:");
|
|
*pEnd = '\0';
|
|
InterpExecute(pServ->pSics, pServ->dummyCon,pStart);
|
|
traceIO("RO", "Received %s from remote", pStart);
|
|
pEnd += strlen("EROC\r\n");
|
|
ANETreadConsume(handle,pEnd - pPtr);
|
|
}
|
|
|
|
/*
|
|
* deal with heartbeats
|
|
*/
|
|
if((pStart = strstr(pPtr,"Poch")) != NULL){
|
|
ANETreadConsume(handle,(pStart+4) - pPtr);
|
|
}
|
|
|
|
/*
|
|
If there is more stuff to process: recurse
|
|
*/
|
|
pPtr = ANETreadPtr(handle,&length);
|
|
if(length > 0 &&
|
|
( strstr(pPtr,":EROC\r\n") != NULL ||
|
|
strstr(pPtr,"TRANSACTIONEND") != NULL
|
|
|| strstr(pPtr,"Poch") != NULL ) ) {
|
|
RemoteReadCallback(handle,userData);
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static int transactCommand(int handle, char *command, char *reply, int replyLen)
|
|
{
|
|
char *toSend = NULL;
|
|
char *prefix = {"transact "};
|
|
int status, length, type;
|
|
time_t start;
|
|
char *pPtr;
|
|
|
|
/*
|
|
* read possible dirt of the line
|
|
*/
|
|
pPtr = ANETreadPtr(handle,&length);
|
|
ANETreadConsume(handle,length);
|
|
|
|
|
|
toSend = malloc(strlen(command) + strlen(prefix) + 1);
|
|
if(toSend == NULL){
|
|
return OOM;
|
|
}
|
|
strcpy(toSend, prefix);
|
|
strcat(toSend, command);
|
|
status = ANETwrite(handle,toSend,strlen(toSend));
|
|
free(toSend);
|
|
if(status != 1){
|
|
return status;
|
|
}
|
|
|
|
/*
|
|
* wait for a reply for max 2 seconds
|
|
*/
|
|
start = time(NULL);
|
|
while(time(NULL) < start + 2.0){
|
|
ANETprocess();
|
|
pPtr = ANETreadPtr(handle,&length);
|
|
if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){
|
|
strncpy(reply,pPtr,replyLen);
|
|
ANETreadConsume(handle,length);
|
|
return 1;
|
|
}
|
|
usleep(100);
|
|
}
|
|
|
|
/*
|
|
* here we have run into a timeout
|
|
*/
|
|
ANETreadConsume(handle,length);
|
|
return TO;
|
|
|
|
}
|
|
/*----------------------------------------------------------------------------*/
|
|
static void ConnectRemoteObject(pRemoteOBJ self)
|
|
{
|
|
char *pPtr, command[1024];
|
|
int length, status;
|
|
ReadData rd;
|
|
pHdb node;
|
|
|
|
|
|
if(self->connected){
|
|
return;
|
|
}
|
|
|
|
self->readHandle = ANETconnect(self->host, self->port);
|
|
self->writeHandle = ANETconnect(self->host, self->port);
|
|
if(self->readHandle < 0 || self->writeHandle < 0){
|
|
self->connected = 0;
|
|
traceIO("RO","Failed to connect to remote objects at %s, port %d",
|
|
self->host, self->port);
|
|
return;
|
|
}
|
|
traceIO("RO","Connected to %s, port %d for remote objects",
|
|
self->host, self->port);
|
|
|
|
/*
|
|
Default login with hard coded manager login. Defined in
|
|
nserver.c
|
|
*/
|
|
ANETwrite(self->readHandle,login,strlen(login));
|
|
ANETwrite(self->writeHandle,login,strlen(login));
|
|
usleep(500);
|
|
ANETprocess();
|
|
/*
|
|
eat the login responses
|
|
*/
|
|
pPtr = ANETreadPtr(self->readHandle, &length);
|
|
ANETreadConsume(self->readHandle,length);
|
|
pPtr = ANETreadPtr(self->writeHandle, &length);
|
|
ANETreadConsume(self->writeHandle,length);
|
|
|
|
|
|
/*
|
|
* install the read callback
|
|
*/
|
|
ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL);
|
|
|
|
/*
|
|
* Remove geterror on read nodes and reinstall callbacks for reconnects
|
|
*/
|
|
status = LLDnodePtr2First(self->readList);
|
|
while(status != 0) {
|
|
LLDblobData(self->readList,&rd);
|
|
node = FindHdbNode(NULL,rd.localNode,NULL);
|
|
if(node != NULL){
|
|
SetHdbProperty(node,"geterror",NULL);
|
|
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n",
|
|
rd.remoteNode, rd.localNode);
|
|
ANETwrite(self->readHandle,command,strlen(command));
|
|
}
|
|
status = LLDnodePtr2Next(self->readList);
|
|
}
|
|
|
|
transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command));
|
|
|
|
self->connected = 1;
|
|
self->writeInUse = 0;
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static void MarkDisconnected(pRemoteOBJ self)
|
|
{
|
|
int status;
|
|
ReadData rd;
|
|
pHdb node;
|
|
|
|
status = LLDnodePtr2First(self->readList);
|
|
while(status != 0) {
|
|
LLDblobData(self->readList,&rd);
|
|
node = FindHdbNode(NULL,rd.localNode,NULL);
|
|
if(node != NULL){
|
|
SetHdbProperty(node,"geterror","Disconnected from remote server");
|
|
}
|
|
status = LLDnodePtr2Next(self->readList);
|
|
}
|
|
self->connected = 0;
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
|
|
pHdbMessage mes)
|
|
{
|
|
pUpdateCallback uppi = (pUpdateCallback)userData;
|
|
hdbDataMessage *mm = NULL;
|
|
pDynString text;
|
|
char *prefix = {"SROC:hupdate "};
|
|
char *postfix= {":EROC\r\n"};
|
|
char *txt = NULL;
|
|
int length;
|
|
pHdbPropertyChange propChange = NULL;
|
|
|
|
mm = GetHdbUpdateMessage(mes);
|
|
if(mm != NULL){
|
|
/*
|
|
* remove myself when the connection is dead...
|
|
*/
|
|
if(!SCisConnected(uppi->sendCon)){
|
|
return hdbKill;
|
|
}
|
|
/*
|
|
* format and send the update command to master
|
|
*/
|
|
text = formatValue(*(mm->v), currentNode);
|
|
length = GetDynStringLength(text) +
|
|
strlen(prefix) + strlen(postfix) + strlen(uppi->remotePath) +5;
|
|
txt = malloc(length*sizeof(char));
|
|
if(txt == NULL){
|
|
return hdbContinue;
|
|
}
|
|
snprintf(txt,length,"%s %s %s %s", prefix, uppi->remotePath,
|
|
GetCharArray(text), postfix);
|
|
SCWrite(uppi->sendCon,txt,eValue);
|
|
free(txt);
|
|
DeleteDynString(text);
|
|
}
|
|
|
|
propChange = GetPropertyChangeMessage(mes);
|
|
if(propChange != NULL){
|
|
/*
|
|
* remove myself when the connection is dead...
|
|
*/
|
|
if(!SCisConnected(uppi->sendCon)){
|
|
return hdbKill;
|
|
}
|
|
length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) +
|
|
strlen(propChange->key) + 10;
|
|
if(propChange->value != NULL){
|
|
length += strlen(propChange->value);
|
|
}
|
|
txt = malloc(length*sizeof(char));
|
|
if(txt == NULL){
|
|
return hdbContinue;
|
|
}
|
|
if(propChange->value == NULL){
|
|
snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath,
|
|
propChange->key,postfix);
|
|
} else {
|
|
snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath,
|
|
propChange->key,propChange->value, postfix);
|
|
}
|
|
SCWrite(uppi->sendCon,txt,eValue);
|
|
free(txt);
|
|
}
|
|
|
|
return hdbContinue;
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static hdbCallbackReturn GetErrorCallback(pHdb currentNode, void *userData,
|
|
pHdbMessage mes)
|
|
{
|
|
hdbDataMessage *mm= NULL;
|
|
char *geterror, error[512];
|
|
SConnection *con = NULL;
|
|
|
|
|
|
mm = GetHdbGetMessage(mes);
|
|
if (mm != NULL) {
|
|
con = mm->callData;
|
|
geterror = GetHdbProp(currentNode, "geterror");
|
|
if (geterror != NULL) {
|
|
snprintf(error,sizeof(error),"ERROR: %s", geterror);
|
|
SCWrite(con, error, eError);
|
|
if (mm->v->dataType == HIPTEXT) {
|
|
if (mm->v->v.text != NULL) {
|
|
free(mm->v->v.text);
|
|
}
|
|
mm->v->v.text = strdup(error);
|
|
}
|
|
return hdbAbort;
|
|
}
|
|
}
|
|
return hdbContinue;
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static void KillUpdateStruct(void *data)
|
|
{
|
|
pUpdateCallback self = (pUpdateCallback)data;
|
|
if(data != NULL){
|
|
SCDeleteConnection(self->sendCon);
|
|
free(self->remotePath);
|
|
free(self);
|
|
}
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
|
|
{
|
|
char command[1024], reply[1024], *pPtr;
|
|
int status, type;
|
|
pHdb localNode = NULL;
|
|
|
|
/*
|
|
* Initialize....
|
|
*/
|
|
localNode = FindHdbNode(NULL,rd.localNode, pCon);
|
|
if(localNode == NULL){
|
|
SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode);
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Refuse duplicate connections
|
|
*/
|
|
pPtr = GetHdbProp(localNode,"remoteread");
|
|
if(pPtr != NULL){
|
|
SCPrintf(pCon,eError,"ERROR: %s is already connected to %s", rd.localNode, pPtr);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Get information about the remote node and check compatability
|
|
*/
|
|
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
|
|
status = transactCommand(self->writeHandle,command,reply,sizeof(reply));
|
|
if(status != 1){
|
|
/*
|
|
* try a reconnect,
|
|
* when fails:
|
|
* Warning
|
|
* add blob
|
|
*/
|
|
self->connected = 0;
|
|
ConnectRemoteObject(self);
|
|
status = transactCommand(self->writeHandle,command,reply,sizeof(reply));
|
|
if(status != 1){
|
|
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
|
|
self->host);
|
|
MarkDisconnected(self);
|
|
LLDblobAdd(self->readList,&rd,sizeof(rd));
|
|
AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback,
|
|
NULL,NULL));
|
|
SetHdbProperty(localNode,"remoteread",rd.remoteNode);
|
|
return 1;
|
|
}
|
|
}
|
|
if(strstr(reply, "ERROR") != NULL){
|
|
SCPrintf(pCon,eError,"%s while trying to contact remote node %s",
|
|
reply, rd.remoteNode);
|
|
return 0;
|
|
}
|
|
/* only interested in type: answer is of style: type,nochildren,length */
|
|
pPtr = strchr(reply,',');
|
|
*pPtr= '\0';
|
|
type = convertHdbType(reply);
|
|
if(type != localNode->value.dataType){
|
|
SCPrintf(pCon,eError,
|
|
"ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d",
|
|
rd.localNode, rd.remoteNode, localNode->value.dataType, type);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Make an entry in the read list
|
|
*/
|
|
LLDblobAdd(self->readList,&rd,sizeof(rd));
|
|
AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback,
|
|
NULL,NULL));
|
|
|
|
SetHdbProperty(localNode,"remoteread",rd.remoteNode);
|
|
|
|
/*
|
|
* Install a callback on the remote node to update the master. The remote should
|
|
* then immediatly send an update which will be processed by the read callback.
|
|
*/
|
|
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n",
|
|
rd.remoteNode, rd.localNode);
|
|
ANETwrite(self->readHandle,command,strlen(command));
|
|
|
|
return 1;
|
|
}
|
|
|
|
/*-----------------------------------------------------------------------------*/
|
|
static int ConnectreadCmd(pSICSOBJ ccmd, SConnection * pCon,
|
|
Hdb * cmdNode, Hdb * par[], int nPar)
|
|
{
|
|
ReadData rd;
|
|
pHdb localNode = NULL;
|
|
char command[1024], reply[1024], *pPtr;
|
|
int status, type;
|
|
pRemoteOBJ self;
|
|
|
|
|
|
if(nPar < 2) {
|
|
SCWrite(pCon,"ERROR: need path to local node and remote node for connectread",
|
|
eError);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Initialize....
|
|
*/
|
|
strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode));
|
|
strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode));
|
|
self = (pRemoteOBJ)ccmd->pPrivate;
|
|
|
|
status = ConnectRead(self,pCon,rd);
|
|
|
|
if(status == 1){
|
|
SCSendOK(pCon);
|
|
}
|
|
|
|
return status;
|
|
}
|
|
/*-----------------------------------------------------------------------------*/
|
|
static int HeartbeatTask(void *pData)
|
|
{
|
|
pRemoteOBJ self = (pRemoteOBJ)pData;
|
|
int status;
|
|
char command[] = {"Poch\r\n"};
|
|
|
|
if (time(NULL) > self->nextHeartbeat){
|
|
status = ANETwrite(self->readHandle,command, strlen(command));
|
|
if(status != 1){
|
|
traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port);
|
|
self->connected = 0;
|
|
ConnectRemoteObject(self);
|
|
if(!self->connected){
|
|
MarkDisconnected(self);
|
|
}
|
|
}
|
|
self->nextHeartbeat = time(NULL) + 10;
|
|
}
|
|
return 1;
|
|
}
|
|
/*============================= writing related code ===========================
|
|
The logic here is to use the standard writeHandle when available. I expect most
|
|
communication to be short and to happen through the writeHandle. If that one is
|
|
in use, a new connection will be built.
|
|
---------------------------------------------------------------------------------
|
|
suppress all superfluous OK from the slave
|
|
-----------------------------------------------------------------------------------*/
|
|
#include <outcode.c>
|
|
static OutCode findOutCode(char *txt)
|
|
{
|
|
int i;
|
|
|
|
for(i = 0; i < iNoCodes; i++){
|
|
if(strstr(txt,pCode[i]) != NULL){
|
|
return i;
|
|
}
|
|
}
|
|
return eValue;
|
|
}
|
|
/*--------------------------------------------------------------------------------*/
|
|
static void printSICS(char *answer, SConnection *pCon)
|
|
{
|
|
char line[1024], *pPtr, *pCode;
|
|
OutCode eCode;
|
|
|
|
pPtr = answer;
|
|
while(pPtr != NULL){
|
|
memset(line,0,sizeof(line));
|
|
pPtr = stptok(pPtr,line,sizeof(line),"\n");
|
|
if(strstr(line,"OK") == NULL){
|
|
pCode = strstr(line,"@@");
|
|
if(pCode != NULL){
|
|
*pCode = '\0';
|
|
pCode += 2;
|
|
eCode = findOutCode(trim(pCode));
|
|
|
|
} else {
|
|
eCode = eValue;
|
|
}
|
|
SCWrite(pCon,line,eCode);
|
|
}
|
|
}
|
|
}
|
|
/*---------------------------------------------------------------------------------*/
|
|
static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle)
|
|
{
|
|
int handle, length;
|
|
char *answer = NULL;
|
|
char command[80];
|
|
|
|
if(self->writeInUse) {
|
|
handle = ANETconnect(self->host,self->port);
|
|
if(handle < 0){
|
|
traceIO("RO","Failed to connect to %s at %d", self->host, self->port);
|
|
if(pCon != NULL){
|
|
SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port);
|
|
}
|
|
return handle;
|
|
}
|
|
ANETwrite(handle,login,strlen(login));
|
|
usleep(500);
|
|
ANETprocess();
|
|
/*
|
|
eat the login responses
|
|
*/
|
|
answer = ANETreadPtr(handle, &length);
|
|
ANETreadConsume(handle,length);
|
|
*newHandle = 1;
|
|
|
|
transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command));
|
|
|
|
} else {
|
|
self->writeInUse = 1;
|
|
handle = self->writeHandle;
|
|
/*
|
|
eat dirt from the line
|
|
*/
|
|
answer = ANETreadPtr(handle, &length);
|
|
ANETreadConsume(handle,length);
|
|
}
|
|
return handle;
|
|
}
|
|
/*---------------------------------------------------------------------------------*/
|
|
static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon)
|
|
{
|
|
char *answer = NULL, *pEnd, *command = NULL;
|
|
int length;
|
|
|
|
while(1){
|
|
TaskYield(pServ->pTasker);
|
|
if(!ANETvalidHandle(handle)){
|
|
SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host);
|
|
break;
|
|
}
|
|
answer = ANETreadPtr(handle,&length);
|
|
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
|
|
if(pCon != NULL){
|
|
*pEnd = '\0';
|
|
printSICS(answer,pCon);
|
|
}
|
|
traceIO("RO","%s:%d: Received %s", self->host, self->port,answer);
|
|
ANETreadConsume(handle,length);
|
|
break;
|
|
}
|
|
}
|
|
|
|
}
|
|
/*---------------------------------------------------------------------------------*/
|
|
static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
|
|
pHdbMessage mes)
|
|
{
|
|
pHdbDataMessage mm = NULL;
|
|
int handle, status, length, newHandle = 0;
|
|
pRemoteOBJ self = (pRemoteOBJ)userData;
|
|
SConnection *pCon = NULL;
|
|
pDynString data;
|
|
char *remoteNode;
|
|
char *command, *answer, *pEnd;
|
|
|
|
|
|
if((mm = GetHdbSetMessage(mes)) != NULL){
|
|
pCon = (SConnection *)mm->callData;
|
|
handle = PrepareWriteHandle(self,pCon,&newHandle);
|
|
if(handle < 0){
|
|
return hdbAbort;
|
|
}
|
|
|
|
/*
|
|
build the command to send
|
|
*/
|
|
data = formatValue(*(mm->v),currentNode);
|
|
remoteNode = GetHdbProp(currentNode,"remotewrite");
|
|
length = 40 + strlen(remoteNode) + GetDynStringLength(data);
|
|
command = malloc(length*sizeof(char));
|
|
if(command == NULL){
|
|
if(pCon != NULL){
|
|
SCWrite(pCon,"ERROR: out of memory writing remote node",eError);
|
|
}
|
|
return hdbAbort;
|
|
}
|
|
snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data));
|
|
|
|
/*
|
|
write
|
|
*/
|
|
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
|
|
status = ANETwrite(handle,command,strlen(command));
|
|
free(command);
|
|
DeleteDynString(data);
|
|
if(status < 0){
|
|
if(pCon != NULL){
|
|
SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host);
|
|
}
|
|
return hdbAbort;
|
|
}
|
|
|
|
/*
|
|
wait for a response: TRANSACTIONFINISHED
|
|
*/
|
|
ProcessWriteResponse(self,handle,pCon);
|
|
|
|
/*
|
|
Is there a termination script?
|
|
*/
|
|
command = GetHdbProp(currentNode,"termscript");
|
|
if(command != NULL){
|
|
while(1) {
|
|
TaskYield(pServ->pTasker);
|
|
Tcl_Eval(InterpGetTcl(pServ->pSics),command);
|
|
answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics));
|
|
if(strstr(answer,"idle") != NULL){
|
|
answer = ANETreadPtr(handle,&length);
|
|
printSICS(answer,pCon);
|
|
traceIO("RO","%s:%d:Received %s", self->host,self->port,answer);
|
|
ANETreadConsume(handle,length);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
if(newHandle){
|
|
ANETclose(handle);
|
|
} else {
|
|
self->writeInUse = 0;
|
|
}
|
|
|
|
}
|
|
|
|
return hdbContinue;
|
|
}
|
|
/*------------------------------------------------------------------------------*/
|
|
static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd)
|
|
{
|
|
pHdb localNode = NULL;
|
|
char command[1024], reply[1024], *pPtr;
|
|
int status, type;
|
|
|
|
localNode = FindHdbNode(NULL,rd.localNode, pCon);
|
|
if(localNode == NULL){
|
|
SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode);
|
|
return 0;
|
|
}
|
|
|
|
pPtr = GetHdbProp(localNode,"remotewrite");
|
|
if(pPtr != NULL){
|
|
SCPrintf(pCon,eError,"ERROR: %s alread connected to %s", rd.localNode,
|
|
rd.remoteNode);
|
|
return 0;
|
|
}
|
|
|
|
SetHdbProperty(localNode,"remotewrite",rd.remoteNode);
|
|
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback,
|
|
self,NULL));
|
|
|
|
/*
|
|
* Get information about the remote node and check compatability
|
|
*/
|
|
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
|
|
status = transactCommand(self->writeHandle,command,reply,sizeof(reply));
|
|
if(status != 1){
|
|
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
|
|
self->host);
|
|
MarkDisconnected(self);
|
|
return 0;
|
|
}
|
|
if(strstr(reply, "ERROR") != NULL){
|
|
SCPrintf(pCon,eError,"%s while trying to contact remote node %s",
|
|
reply, rd.remoteNode);
|
|
return 0;
|
|
}
|
|
/* only interested in type: answer is of style: type,nochildren,length */
|
|
pPtr = strchr(reply,',');
|
|
*pPtr= '\0';
|
|
type = convertHdbType(reply);
|
|
if(type != localNode->value.dataType){
|
|
SCPrintf(pCon,eError,
|
|
"ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d",
|
|
rd.localNode, rd.remoteNode, localNode->value.dataType, type);
|
|
return 0;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
/*---------------------------------------------------------------------------------*/
|
|
static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon,
|
|
Hdb * cmdNode, Hdb * par[], int nPar)
|
|
{
|
|
ReadData rd;
|
|
int status;
|
|
pRemoteOBJ self;
|
|
|
|
|
|
if(nPar < 2) {
|
|
SCWrite(pCon,"ERROR: need path to local node and remote node for connectwrite",
|
|
eError);
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Initialize....
|
|
*/
|
|
strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode));
|
|
strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode));
|
|
self = (pRemoteOBJ)ccmd->pPrivate;
|
|
|
|
status = ConnectWrite(self,pCon,rd);
|
|
|
|
if(status == 1){
|
|
SCSendOK(pCon);
|
|
}
|
|
return status;
|
|
}
|
|
/*============================ remote execute =================================*/
|
|
static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command)
|
|
{
|
|
int status, handle, newHandle = 0, length;
|
|
char *answer, *pEnd;
|
|
|
|
handle = PrepareWriteHandle(self,pCon,&newHandle);
|
|
if(handle < 0){
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
write, thereby taking care to prefix with transact and for proper termination
|
|
*/
|
|
if(strstr(command,"transact") == NULL){
|
|
ANETwrite(handle,"transact ", sizeof("transact "));
|
|
}
|
|
status = ANETwrite(handle,command,strlen(command));
|
|
if(strstr(command,"\n") == NULL){
|
|
ANETwrite(handle,"\r\n",2);
|
|
}
|
|
if(status < 0){
|
|
traceIO("RO","Disconnect from %s while executing %s", self->host, command);
|
|
if(pCon != NULL){
|
|
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
wait for response
|
|
*/
|
|
while(1){
|
|
TaskYield(pServ->pTasker);
|
|
if(!ANETvalidHandle(handle)){
|
|
if(pCon != NULL){
|
|
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
|
|
}
|
|
break;
|
|
}
|
|
answer = ANETreadPtr(handle,&length);
|
|
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
|
|
if(pCon != NULL){
|
|
*pEnd = '\0';
|
|
SCPrintf(pCon,eValue,answer);
|
|
}
|
|
ANETreadConsume(handle,length);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if(newHandle){
|
|
ANETclose(handle);
|
|
} else {
|
|
self->writeInUse = 0;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
/*------------------------------------------------------------------------------*/
|
|
static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon,
|
|
Hdb * cmdNode, Hdb * par[], int nPar)
|
|
{
|
|
int status, i;
|
|
char *pPtr;
|
|
Tcl_DString com;
|
|
pDynString val;
|
|
pRemoteOBJ self;
|
|
|
|
self = (pRemoteOBJ)ccmd->pPrivate;
|
|
|
|
Tcl_DStringInit(&com);
|
|
for (i = 0; i < nPar; i++) {
|
|
val = formatValue(par[i]->value, par[i]);
|
|
if (val != NULL) {
|
|
Tcl_DStringAppend(&com, " ", 1);
|
|
pPtr = GetCharArray(val);
|
|
Tcl_DStringAppend(&com, pPtr, strlen(pPtr));
|
|
DeleteDynString(val);
|
|
}
|
|
}
|
|
|
|
status = RemoteExecute(self,pCon,Tcl_DStringValue(&com));
|
|
Tcl_DStringFree(&com);
|
|
|
|
return status;
|
|
|
|
}
|
|
/*============================= connect command ================================*/
|
|
static int ConnectCmd(pSICSOBJ ccmd, SConnection * pCon,
|
|
Hdb * cmdNode, Hdb * par[], int nPar)
|
|
{
|
|
pRemoteOBJ self;
|
|
|
|
self = (pRemoteOBJ)ccmd->pPrivate;
|
|
ConnectRemoteObject(self);
|
|
|
|
if(self->connected){
|
|
SCSendOK(pCon);
|
|
} else {
|
|
SCPrintf(pCon,eError,"ERROR: failed to connect to %s %d", self->host, self->port);
|
|
return 0;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
/*============================ object initialisation etc =======================*/
|
|
static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
|
|
int argc, char *argv[])
|
|
{
|
|
pSICSOBJ pNew = NULL;
|
|
pRemoteOBJ self = NULL;
|
|
int status;
|
|
pHdb cmd;
|
|
char roTaskName[256];
|
|
|
|
if(argc < 4) {
|
|
SCWrite(pCon,"ERROR: need name and remote host name and port in order to create remote object",
|
|
eError);
|
|
return 0;
|
|
}
|
|
|
|
strtolower(argv[1]);
|
|
if(FindCommand(pSics,argv[1]) != NULL){
|
|
SCPrintf(pCon,eError, "ERROR: command %s already exists!", argv[1]);
|
|
return 0;
|
|
}
|
|
|
|
pNew = MakeSICSOBJ(argv[1],"RemoteOBJ");
|
|
self = calloc(1, sizeof(RemoteOBJ));
|
|
if(pNew == NULL || self == NULL){
|
|
SCWrite(pCon,"ERROR: out of memory creating remote object", eError);
|
|
return 0;
|
|
}
|
|
pNew->pPrivate = self;
|
|
pNew->KillPrivate = KillRemoteOBJ;
|
|
self->host = strdup(argv[2]);
|
|
self->port = atoi(argv[3]);
|
|
self->readList = LLDblobCreate();
|
|
ConnectRemoteObject(self);
|
|
|
|
cmd = AddSICSHdbPar(pNew->objectNode,
|
|
"connectread", usMugger, MakeSICSFunc(ConnectreadCmd));
|
|
AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText(""));
|
|
AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText(""));
|
|
|
|
|
|
cmd = AddSICSHdbPar(pNew->objectNode,
|
|
"connectwrite", usMugger, MakeSICSFunc(ConnectwriteCmd));
|
|
AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText(""));
|
|
AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText(""));
|
|
|
|
|
|
cmd = AddSICSHdbPar(pNew->objectNode,
|
|
"exe", usMugger, MakeSICSFunc(RemoteExecuteCmd));
|
|
AddSICSHdbPar(cmd, "args", usMugger, MakeHdbText(""));
|
|
|
|
cmd = AddSICSHdbPar(pNew->objectNode,
|
|
"connect", usMugger, MakeSICSFunc(ConnectCmd));
|
|
|
|
|
|
status = AddCommand(pSics,
|
|
argv[1],
|
|
InterInvokeSICSOBJ,
|
|
KillSICSOBJ, pNew);
|
|
|
|
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
|
|
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
|
|
|
|
return status;
|
|
}
|
|
/*----------------------------------------------------------------------------------------*/
|
|
static int AddRemoteCallback(SConnection *pCon, SicsInterp *pSics, void *pData,
|
|
int argc, char *argv[])
|
|
{
|
|
pHdb localNode = NULL;
|
|
pUpdateCallback up = NULL;
|
|
|
|
if(argc < 3) {
|
|
SCWrite(pCon,"ERROR: need path to local node and remote node for updatecb",
|
|
eError);
|
|
return 0;
|
|
}
|
|
|
|
localNode = FindHdbNode(NULL,argv[1], pCon);
|
|
if(localNode == NULL){
|
|
SCPrintf(pCon,eError,"ERROR: local node %s not found", argv[1]);
|
|
return 0;
|
|
}
|
|
|
|
up = malloc(sizeof(UpdateCallback));
|
|
if(up == NULL){
|
|
SCWrite(pCon,"ERROR: out of memory installing update callback",eError);
|
|
return 0;
|
|
}
|
|
up->sendCon = SCCopyConnection(pCon);
|
|
up->remotePath = strdup(argv[2]);
|
|
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROUpdateCallback,
|
|
up,KillUpdateStruct));
|
|
/**
|
|
* This is meant to send an update immediatly such that the remote node
|
|
* is updated right away,
|
|
*/
|
|
NotifyHipadabaPar(localNode, NULL);
|
|
|
|
SCSendOK(pCon);
|
|
return 1;
|
|
|
|
}
|
|
/*----------------------------------------------------------------------------------------*/
|
|
void RemoteObjectInit(void)
|
|
{
|
|
|
|
AddCommand(pServ->pSics,
|
|
"makeremo",
|
|
MakeRemoteObject,
|
|
NULL,NULL);
|
|
|
|
AddCommand(pServ->pSics,
|
|
"addremotecb",
|
|
AddRemoteCallback,
|
|
NULL,NULL);
|
|
|
|
|
|
}
|