/** * Remote objects in sicsobj. This means accessing remote objects in a different * SICS server from a master SICS server. * * Reading is implementd according to this scheme: * * * When a read connection is made between a local node and a remote node in slave, then a * callback is installed on remote node in slave. * * The callback on remote in slave sends commands to update the local node when either the * value or the geterror property changes on the remote node. The commands sent are * enclosed in special delimiters * * A special ANET callback evaluates the data coming from slave and acts accordingly, thus * updating the local node. This is driven by the general network driving code of SICS * * * in order to detect availability and re-availability of slave a Heartbeat Task sends a * heartbeat message to slave. Thereby testing the connection regularly, trying to reconnect etc. * * COPRYRIGHT: see file COPYRIGHT * * Mark Koennecke, February 2015 **/ #include #include #include #include #include #include #include #include #include #include #include #include #define OOM -5001 /* out of memory */ #define TO -5002 /* timeout */ static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; extern char *trim(char *txt); /*---------------------- our very private data structure -------------------*/ typedef struct { char *host; int port; int readHandle; int writeHandle; int writeInUse; int readList; unsigned int connected; time_t nextHeartbeat; } RemoteOBJ, *pRemoteOBJ; /*----------------------------------------------------------------------------*/ typedef struct { char localNode[1024]; char remoteNode[1024]; } ReadData, *pReadData; /*---------------------------------------------------------------------------*/ typedef struct { SConnection *sendCon; char *remotePath; } UpdateCallback, *pUpdateCallback; /*----------------------------------------------------------------------------*/ void KillRemoteOBJ(void *data) { char roTaskName[132]; pRemoteOBJ self = (pRemoteOBJ) data; if(data != NULL){ snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); StopTask(pServ->pTasker,roTaskName); free(self->host); ANETclose(self->readHandle); ANETclose(self->writeHandle); LLDdeleteBlob(self->readList); } } /*========================= reading related code ================================*/ static int RemoteReadCallback(int handle, void *userData) { int length; char *pPtr, *pStart, *pEnd; pPtr = ANETreadPtr(handle,&length); /* * deal with command results */ pStart = strstr(pPtr, "TRANSACTIONSTART"); pEnd = strstr(pPtr,"TRANSACTIONEND"); if(pStart != NULL && pEnd != NULL){ pStart = pStart + strlen("TRANSACTIONSTART"); *pEnd = '\0'; traceIO("RO","Received command - reply: %s", pStart); pEnd += strlen("TRANSACTIONEND"); ANETreadConsume(handle,pEnd - pPtr); } /* * deal with update messages */ pStart = strstr(pPtr, "SROC:"); pEnd = strstr(pPtr,":EROC\r\n"); if(pStart != NULL && pEnd != NULL){ pStart += strlen("SROC:"); *pEnd = '\0'; InterpExecute(pServ->pSics, pServ->dummyCon,pStart); traceIO("RO", "Received %s from remote", pStart); pEnd += strlen("EROC\r\n"); ANETreadConsume(handle,pEnd - pPtr); } /* * deal with heartbeats */ if((pStart = strstr(pPtr,"Poch")) != NULL){ ANETreadConsume(handle,(pStart+4) - pPtr); } /* If there is more stuff to process: recurse */ pPtr = ANETreadPtr(handle,&length); if(length > 0 && ( strstr(pPtr,":EROC\r\n") != NULL || strstr(pPtr,"TRANSACTIONEND") != NULL || strstr(pPtr,"Poch") != NULL ) ) { RemoteReadCallback(handle,userData); } return 1; } /*-----------------------------------------------------------------------------*/ static int transactCommand(int handle, char *command, char *reply, int replyLen) { char *toSend = NULL; char *prefix = {"transact "}; int status, length, type; time_t start; char *pPtr; /* * read possible dirt of the line */ pPtr = ANETreadPtr(handle,&length); ANETreadConsume(handle,length); toSend = malloc(strlen(command) + strlen(prefix) + 1); if(toSend == NULL){ return OOM; } strcpy(toSend, prefix); strcat(toSend, command); status = ANETwrite(handle,toSend,strlen(toSend)); free(toSend); if(status != 1){ return status; } /* * wait for a reply for max 2 seconds */ start = time(NULL); while(time(NULL) < start + 2.0){ ANETprocess(); pPtr = ANETreadPtr(handle,&length); if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ strncpy(reply,pPtr,replyLen); ANETreadConsume(handle,length); return 1; } usleep(100); } /* * here we have run into a timeout */ ANETreadConsume(handle,length); return TO; } /*----------------------------------------------------------------------------*/ static void ConnectRemoteObject(pRemoteOBJ self) { char *pPtr, command[1024]; int length, status; ReadData rd; pHdb node; if(self->connected){ return; } self->readHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port); if(self->readHandle < 0 || self->writeHandle < 0){ self->connected = 0; traceIO("RO","Failed to connect to remote objects at %s, port %d", self->host, self->port); return; } traceIO("RO","Connected to %s, port %d for remote objects", self->host, self->port); /* Default login with hard coded manager login. Defined in nserver.c */ ANETwrite(self->readHandle,login,strlen(login)); ANETwrite(self->writeHandle,login,strlen(login)); usleep(500); ANETprocess(); /* eat the login responses */ pPtr = ANETreadPtr(self->readHandle, &length); ANETreadConsume(self->readHandle,length); pPtr = ANETreadPtr(self->writeHandle, &length); ANETreadConsume(self->writeHandle,length); /* * install the read callback */ ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL); /* * Remove geterror on read nodes and reinstall callbacks for reconnects */ status = LLDnodePtr2First(self->readList); while(status != 0) { LLDblobData(self->readList,&rd); node = FindHdbNode(NULL,rd.localNode,NULL); if(node != NULL){ SetHdbProperty(node,"geterror",NULL); snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", rd.remoteNode, rd.localNode); ANETwrite(self->readHandle,command,strlen(command)); } status = LLDnodePtr2Next(self->readList); } transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)); self->connected = 1; self->writeInUse = 0; } /*-----------------------------------------------------------------------------*/ static void MarkDisconnected(pRemoteOBJ self) { int status; ReadData rd; pHdb node; status = LLDnodePtr2First(self->readList); while(status != 0) { LLDblobData(self->readList,&rd); node = FindHdbNode(NULL,rd.localNode,NULL); if(node != NULL){ SetHdbProperty(node,"geterror","Disconnected from remote server"); } status = LLDnodePtr2Next(self->readList); } self->connected = 0; } /*-----------------------------------------------------------------------------*/ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, pHdbMessage mes) { pUpdateCallback uppi = (pUpdateCallback)userData; hdbDataMessage *mm = NULL; pDynString text; char *prefix = {"SROC:hupdate "}; char *postfix= {":EROC\r\n"}; char *txt = NULL; int length; pHdbPropertyChange propChange = NULL; mm = GetHdbUpdateMessage(mes); if(mm != NULL){ /* * remove myself when the connection is dead... */ if(!SCisConnected(uppi->sendCon)){ return hdbKill; } /* * format and send the update command to master */ text = formatValue(*(mm->v), currentNode); length = GetDynStringLength(text) + strlen(prefix) + strlen(postfix) + strlen(uppi->remotePath) +5; txt = malloc(length*sizeof(char)); if(txt == NULL){ return hdbContinue; } snprintf(txt,length,"%s %s %s %s", prefix, uppi->remotePath, GetCharArray(text), postfix); SCWrite(uppi->sendCon,txt,eValue); free(txt); DeleteDynString(text); } propChange = GetPropertyChangeMessage(mes); if(propChange != NULL){ /* * remove myself when the connection is dead... */ if(!SCisConnected(uppi->sendCon)){ return hdbKill; } length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + strlen(propChange->key) + 10; if(propChange->value != NULL){ length += strlen(propChange->value); } txt = malloc(length*sizeof(char)); if(txt == NULL){ return hdbContinue; } if(propChange->value == NULL){ snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, propChange->key,postfix); } else { snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, propChange->key,propChange->value, postfix); } SCWrite(uppi->sendCon,txt,eValue); free(txt); } return hdbContinue; } /*-----------------------------------------------------------------------------*/ static hdbCallbackReturn GetErrorCallback(pHdb currentNode, void *userData, pHdbMessage mes) { hdbDataMessage *mm= NULL; char *geterror, error[512]; SConnection *con = NULL; mm = GetHdbGetMessage(mes); if (mm != NULL) { con = mm->callData; geterror = GetHdbProp(currentNode, "geterror"); if (geterror != NULL) { snprintf(error,sizeof(error),"ERROR: %s", geterror); SCWrite(con, error, eError); if (mm->v->dataType == HIPTEXT) { if (mm->v->v.text != NULL) { free(mm->v->v.text); } mm->v->v.text = strdup(error); } return hdbAbort; } } return hdbContinue; } /*-----------------------------------------------------------------------------*/ static void KillUpdateStruct(void *data) { pUpdateCallback self = (pUpdateCallback)data; if(data != NULL){ SCDeleteConnection(self->sendCon); free(self->remotePath); free(self); } } /*-----------------------------------------------------------------------------*/ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) { char command[1024], reply[1024], *pPtr; int status, type; pHdb localNode = NULL; /* * Initialize.... */ localNode = FindHdbNode(NULL,rd.localNode, pCon); if(localNode == NULL){ SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode); return 0; } /** * Refuse duplicate connections */ pPtr = GetHdbProp(localNode,"remoteread"); if(pPtr != NULL){ SCPrintf(pCon,eError,"ERROR: %s is already connected to %s", rd.localNode, pPtr); return 0; } /* * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); if(status != 1){ /* * try a reconnect, * when fails: * Warning * add blob */ self->connected = 0; ConnectRemoteObject(self); status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); MarkDisconnected(self); LLDblobAdd(self->readList,&rd,sizeof(rd)); AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback, NULL,NULL)); SetHdbProperty(localNode,"remoteread",rd.remoteNode); return 1; } } if(strstr(reply, "ERROR") != NULL){ SCPrintf(pCon,eError,"%s while trying to contact remote node %s", reply, rd.remoteNode); return 0; } /* only interested in type: answer is of style: type,nochildren,length */ pPtr = strchr(reply,','); *pPtr= '\0'; type = convertHdbType(reply); if(type != localNode->value.dataType){ SCPrintf(pCon,eError, "ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d", rd.localNode, rd.remoteNode, localNode->value.dataType, type); return 0; } /* * Make an entry in the read list */ LLDblobAdd(self->readList,&rd,sizeof(rd)); AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback, NULL,NULL)); SetHdbProperty(localNode,"remoteread",rd.remoteNode); /* * Install a callback on the remote node to update the master. The remote should * then immediatly send an update which will be processed by the read callback. */ snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", rd.remoteNode, rd.localNode); ANETwrite(self->readHandle,command,strlen(command)); return 1; } /*-----------------------------------------------------------------------------*/ static int ConnectreadCmd(pSICSOBJ ccmd, SConnection * pCon, Hdb * cmdNode, Hdb * par[], int nPar) { ReadData rd; pHdb localNode = NULL; char command[1024], reply[1024], *pPtr; int status, type; pRemoteOBJ self; if(nPar < 2) { SCWrite(pCon,"ERROR: need path to local node and remote node for connectread", eError); return 0; } /* * Initialize.... */ strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode)); strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode)); self = (pRemoteOBJ)ccmd->pPrivate; status = ConnectRead(self,pCon,rd); if(status == 1){ SCSendOK(pCon); } return status; } /*-----------------------------------------------------------------------------*/ static int HeartbeatTask(void *pData) { pRemoteOBJ self = (pRemoteOBJ)pData; int status; char command[] = {"Poch\r\n"}; if (time(NULL) > self->nextHeartbeat){ status = ANETwrite(self->readHandle,command, strlen(command)); if(status != 1){ traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); self->connected = 0; ConnectRemoteObject(self); if(!self->connected){ MarkDisconnected(self); } } self->nextHeartbeat = time(NULL) + 10; } return 1; } /*============================= writing related code =========================== The logic here is to use the standard writeHandle when available. I expect most communication to be short and to happen through the writeHandle. If that one is in use, a new connection will be built. --------------------------------------------------------------------------------- suppress all superfluous OK from the slave -----------------------------------------------------------------------------------*/ #include static OutCode findOutCode(char *txt) { int i; for(i = 0; i < iNoCodes; i++){ if(strstr(txt,pCode[i]) != NULL){ return i; } } return eValue; } /*--------------------------------------------------------------------------------*/ static void printSICS(char *answer, SConnection *pCon) { char line[1024], *pPtr, *pCode; OutCode eCode; pPtr = answer; while(pPtr != NULL){ memset(line,0,sizeof(line)); pPtr = stptok(pPtr,line,sizeof(line),"\n"); if(strstr(line,"OK") == NULL){ pCode = strstr(line,"@@"); if(pCode != NULL){ *pCode = '\0'; pCode += 2; eCode = findOutCode(trim(pCode)); } else { eCode = eValue; } SCWrite(pCon,line,eCode); } } } /*---------------------------------------------------------------------------------*/ static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) { int handle, length; char *answer = NULL; char command[80]; if(self->writeInUse) { handle = ANETconnect(self->host,self->port); if(handle < 0){ traceIO("RO","Failed to connect to %s at %d", self->host, self->port); if(pCon != NULL){ SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); } return handle; } ANETwrite(handle,login,strlen(login)); usleep(500); ANETprocess(); /* eat the login responses */ answer = ANETreadPtr(handle, &length); ANETreadConsume(handle,length); *newHandle = 1; transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); } else { self->writeInUse = 1; handle = self->writeHandle; /* eat dirt from the line */ answer = ANETreadPtr(handle, &length); ANETreadConsume(handle,length); } return handle; } /*---------------------------------------------------------------------------------*/ static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) { char *answer = NULL, *pEnd, *command = NULL; int length; while(1){ TaskYield(pServ->pTasker); if(!ANETvalidHandle(handle)){ SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host); break; } answer = ANETreadPtr(handle,&length); if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ if(pCon != NULL){ *pEnd = '\0'; printSICS(answer,pCon); } traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); ANETreadConsume(handle,length); break; } } } /*---------------------------------------------------------------------------------*/ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, pHdbMessage mes) { pHdbDataMessage mm = NULL; int handle, status, length, newHandle = 0; pRemoteOBJ self = (pRemoteOBJ)userData; SConnection *pCon = NULL; pDynString data; char *remoteNode; char *command, *answer, *pEnd; if((mm = GetHdbSetMessage(mes)) != NULL){ pCon = (SConnection *)mm->callData; handle = PrepareWriteHandle(self,pCon,&newHandle); if(handle < 0){ return hdbAbort; } /* build the command to send */ data = formatValue(*(mm->v),currentNode); remoteNode = GetHdbProp(currentNode,"remotewrite"); length = 40 + strlen(remoteNode) + GetDynStringLength(data); command = malloc(length*sizeof(char)); if(command == NULL){ if(pCon != NULL){ SCWrite(pCon,"ERROR: out of memory writing remote node",eError); } return hdbAbort; } snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); /* write */ traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); status = ANETwrite(handle,command,strlen(command)); free(command); DeleteDynString(data); if(status < 0){ if(pCon != NULL){ SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host); } return hdbAbort; } /* wait for a response: TRANSACTIONFINISHED */ ProcessWriteResponse(self,handle,pCon); /* Is there a termination script? */ command = GetHdbProp(currentNode,"termscript"); if(command != NULL){ while(1) { TaskYield(pServ->pTasker); Tcl_Eval(InterpGetTcl(pServ->pSics),command); answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics)); if(strstr(answer,"idle") != NULL){ answer = ANETreadPtr(handle,&length); printSICS(answer,pCon); traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); ANETreadConsume(handle,length); break; } } } if(newHandle){ ANETclose(handle); } else { self->writeInUse = 0; } } return hdbContinue; } /*------------------------------------------------------------------------------*/ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) { pHdb localNode = NULL; char command[1024], reply[1024], *pPtr; int status, type; localNode = FindHdbNode(NULL,rd.localNode, pCon); if(localNode == NULL){ SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode); return 0; } pPtr = GetHdbProp(localNode,"remotewrite"); if(pPtr != NULL){ SCPrintf(pCon,eError,"ERROR: %s alread connected to %s", rd.localNode, rd.remoteNode); return 0; } SetHdbProperty(localNode,"remotewrite",rd.remoteNode); AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback, self,NULL)); /* * Get information about the remote node and check compatability */ snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); if(status != 1){ SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", self->host); MarkDisconnected(self); return 0; } if(strstr(reply, "ERROR") != NULL){ SCPrintf(pCon,eError,"%s while trying to contact remote node %s", reply, rd.remoteNode); return 0; } /* only interested in type: answer is of style: type,nochildren,length */ pPtr = strchr(reply,','); *pPtr= '\0'; type = convertHdbType(reply); if(type != localNode->value.dataType){ SCPrintf(pCon,eError, "ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d", rd.localNode, rd.remoteNode, localNode->value.dataType, type); return 0; } return 1; } /*---------------------------------------------------------------------------------*/ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon, Hdb * cmdNode, Hdb * par[], int nPar) { ReadData rd; int status; pRemoteOBJ self; if(nPar < 2) { SCWrite(pCon,"ERROR: need path to local node and remote node for connectwrite", eError); return 0; } /* * Initialize.... */ strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode)); strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode)); self = (pRemoteOBJ)ccmd->pPrivate; status = ConnectWrite(self,pCon,rd); if(status == 1){ SCSendOK(pCon); } return status; } /*============================ remote execute =================================*/ static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) { int status, handle, newHandle = 0, length; char *answer, *pEnd; handle = PrepareWriteHandle(self,pCon,&newHandle); if(handle < 0){ return 0; } /* write, thereby taking care to prefix with transact and for proper termination */ if(strstr(command,"transact") == NULL){ ANETwrite(handle,"transact ", sizeof("transact ")); } status = ANETwrite(handle,command,strlen(command)); if(strstr(command,"\n") == NULL){ ANETwrite(handle,"\r\n",2); } if(status < 0){ traceIO("RO","Disconnect from %s while executing %s", self->host, command); if(pCon != NULL){ SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); } return 0; } /* wait for response */ while(1){ TaskYield(pServ->pTasker); if(!ANETvalidHandle(handle)){ if(pCon != NULL){ SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); } break; } answer = ANETreadPtr(handle,&length); if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ if(pCon != NULL){ *pEnd = '\0'; SCPrintf(pCon,eValue,answer); } ANETreadConsume(handle,length); break; } } if(newHandle){ ANETclose(handle); } else { self->writeInUse = 0; } return 1; } /*------------------------------------------------------------------------------*/ static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, Hdb * cmdNode, Hdb * par[], int nPar) { int status, i; char *pPtr; Tcl_DString com; pDynString val; pRemoteOBJ self; self = (pRemoteOBJ)ccmd->pPrivate; Tcl_DStringInit(&com); for (i = 0; i < nPar; i++) { val = formatValue(par[i]->value, par[i]); if (val != NULL) { Tcl_DStringAppend(&com, " ", 1); pPtr = GetCharArray(val); Tcl_DStringAppend(&com, pPtr, strlen(pPtr)); DeleteDynString(val); } } status = RemoteExecute(self,pCon,Tcl_DStringValue(&com)); Tcl_DStringFree(&com); return status; } /*============================= connect command ================================*/ static int ConnectCmd(pSICSOBJ ccmd, SConnection * pCon, Hdb * cmdNode, Hdb * par[], int nPar) { pRemoteOBJ self; self = (pRemoteOBJ)ccmd->pPrivate; ConnectRemoteObject(self); if(self->connected){ SCSendOK(pCon); } else { SCPrintf(pCon,eError,"ERROR: failed to connect to %s %d", self->host, self->port); return 0; } return 1; } /*============================ object initialisation etc =======================*/ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, int argc, char *argv[]) { pSICSOBJ pNew = NULL; pRemoteOBJ self = NULL; int status; pHdb cmd; char roTaskName[256]; if(argc < 4) { SCWrite(pCon,"ERROR: need name and remote host name and port in order to create remote object", eError); return 0; } strtolower(argv[1]); if(FindCommand(pSics,argv[1]) != NULL){ SCPrintf(pCon,eError, "ERROR: command %s already exists!", argv[1]); return 0; } pNew = MakeSICSOBJ(argv[1],"RemoteOBJ"); self = calloc(1, sizeof(RemoteOBJ)); if(pNew == NULL || self == NULL){ SCWrite(pCon,"ERROR: out of memory creating remote object", eError); return 0; } pNew->pPrivate = self; pNew->KillPrivate = KillRemoteOBJ; self->host = strdup(argv[2]); self->port = atoi(argv[3]); self->readList = LLDblobCreate(); ConnectRemoteObject(self); cmd = AddSICSHdbPar(pNew->objectNode, "connectread", usMugger, MakeSICSFunc(ConnectreadCmd)); AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText("")); AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText("")); cmd = AddSICSHdbPar(pNew->objectNode, "connectwrite", usMugger, MakeSICSFunc(ConnectwriteCmd)); AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText("")); AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText("")); cmd = AddSICSHdbPar(pNew->objectNode, "exe", usMugger, MakeSICSFunc(RemoteExecuteCmd)); AddSICSHdbPar(cmd, "args", usMugger, MakeHdbText("")); cmd = AddSICSHdbPar(pNew->objectNode, "connect", usMugger, MakeSICSFunc(ConnectCmd)); status = AddCommand(pSics, argv[1], InterInvokeSICSOBJ, KillSICSOBJ, pNew); snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); return status; } /*----------------------------------------------------------------------------------------*/ static int AddRemoteCallback(SConnection *pCon, SicsInterp *pSics, void *pData, int argc, char *argv[]) { pHdb localNode = NULL; pUpdateCallback up = NULL; if(argc < 3) { SCWrite(pCon,"ERROR: need path to local node and remote node for updatecb", eError); return 0; } localNode = FindHdbNode(NULL,argv[1], pCon); if(localNode == NULL){ SCPrintf(pCon,eError,"ERROR: local node %s not found", argv[1]); return 0; } up = malloc(sizeof(UpdateCallback)); if(up == NULL){ SCWrite(pCon,"ERROR: out of memory installing update callback",eError); return 0; } up->sendCon = SCCopyConnection(pCon); up->remotePath = strdup(argv[2]); AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROUpdateCallback, up,KillUpdateStruct)); /** * This is meant to send an update immediatly such that the remote node * is updated right away, */ NotifyHipadabaPar(localNode, NULL); SCSendOK(pCon); return 1; } /*----------------------------------------------------------------------------------------*/ void RemoteObjectInit(void) { AddCommand(pServ->pSics, "makeremo", MakeRemoteObject, NULL,NULL); AddCommand(pServ->pSics, "addremotecb", AddRemoteCallback, NULL,NULL); }