diff --git a/conman.c b/conman.c index 70c1b25c..88b2ad1f 100644 --- a/conman.c +++ b/conman.c @@ -1016,7 +1016,6 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut) return 1; } - /*-------------------------------------------------------------------------- special for ClientLog. Do not use elsewhere without check ----------------------------------------------------------------------------*/ @@ -1044,6 +1043,18 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut) if(pPtr != pBueffel){ free(pPtr); } + } else if(self->iProtocolID == 2) { + if (strlen(buffer) + 30 > 1024) { + pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); + memset(pPtr, 0, strlen(buffer) + 20); + } else { + pPtr = pBueffel; + } + sprintf(pPtr,"%s@@%s",buffer,pCode[iOut]); + testAndWriteSocket(self, pPtr, iOut); + if(pPtr != pBueffel){ + free(pPtr); + } } else { testAndWriteSocket(self, buffer, iOut); } diff --git a/hipadaba.c b/hipadaba.c index 806cdf4f..1514374c 100644 --- a/hipadaba.c +++ b/hipadaba.c @@ -22,6 +22,7 @@ static char update[] = { "update" }; static char treeChange[] = { "treeChange" }; static char dataSearch[] = { "dataSearch" }; static char killNode[] = { "killNode" }; +static char propertyChange[] = { "propertyChange" }; /*------------------------------------------------------------------------*/ pHdbDataMessage GetHdbSetMessage(pHdbMessage toTest) @@ -77,6 +78,15 @@ pHdbMessage GetHdbKillNodeMessage(pHdbMessage toTest) return NULL; } +/*-------------------------------------------------------------------------*/ +pHdbPropertyChange GetPropertyChangeMessage(pHdbMessage toTest) +{ + if (toTest->type == propertyChange) { + return (pHdbPropertyChange)toTest; + } + return NULL; +} + /*================== internal functions ===================================*/ void DeleteCallbackChain(pHdb node) { @@ -1140,6 +1150,8 @@ static int calcDataLength(pHdb node, int testLength) /*============================= Property Functions ==========================*/ void SetHdbProperty(pHdb node, char *key, char *value) { + hdbPropertyChange propMes; + if (node != NULL && key != NULL && node->properties != NULL) { if (value == NULL) { StringDictDelete(node->properties, key); @@ -1148,6 +1160,10 @@ void SetHdbProperty(pHdb node, char *key, char *value) } else { StringDictAddPair(node->properties, key, value); } + propMes.type = propertyChange; + propMes.key = key; + propMes.value = value; + InvokeCallbackChain(node,(pHdbMessage)&propMes); } } diff --git a/hipadaba.h b/hipadaba.h index be087396..f4f86328 100644 --- a/hipadaba.h +++ b/hipadaba.h @@ -26,6 +26,8 @@ * Added support for properties, Mark Koennecke, January 2007 * * Refactored callback handling, Markus Zolliker, Mark Koennecke, March 2008 + * + * Added property chnage events. Mark Koennecke, February 2015 */ #ifndef HIPADABA #define HIPADABA @@ -75,7 +77,8 @@ typedef struct __hipadaba { pStringDict properties; } Hdb, *pHdb; /*-------------- return values for callback functions -------------------------*/ -typedef enum { hdbContinue, +typedef enum { + hdbContinue, hdbAbort, hdbKill } hdbCallbackReturn; @@ -101,6 +104,12 @@ typedef struct { void *result; } hdbDataSearch, *pHdbDataSearch; /*-------------------------------------------------------------------------------*/ +typedef struct { + char *type; + char *key; + char *value; +} hdbPropertyChange, *pHdbPropertyChange; +/*-------------------------------------------------------------------------------*/ typedef hdbCallbackReturn(*hdbCallbackFunction) (pHdb currentNode, void *userData, pHdbMessage message); @@ -156,6 +165,14 @@ pHdbDataSearch GetHdbDataSearchMessage(pHdbMessage toTest); * pointer if it is. */ pHdbMessage GetHdbKillNodeMessage(pHdbMessage toTest); +/** + * Test a message if it is a property change message + * @param toTest The message to test. + * @return NULL if the message is no property chnage message or a message + * pointer if it is. + */ +pHdbPropertyChange GetPropertyChangeMessage(pHdbMessage toTest); + /*======================== Function protoypes: hdbData ========================*/ /** * make a hdbValue with the given datatype and length diff --git a/make_gen b/make_gen index 09527280..0749dfa2 100644 --- a/make_gen +++ b/make_gen @@ -46,7 +46,7 @@ SOBJ = network.o ifile.o conman.o SCinter.o splitter.o passwd.o \ rwpuffer.o asynnet.o background.o countersec.o hdbtable.o velosec.o \ histmemsec.o sansbc.o sicsutil.o strlutil.o genbinprot.o trace.o\ singlebinb.o taskobj.o sctcomtask.o tasmono.o multicountersec.o \ - messagepipe.o sicsget.o + messagepipe.o sicsget.o remoteobject.o MOTOROBJ = motor.o simdriv.o COUNTEROBJ = countdriv.o simcter.o counter.o diff --git a/motorsec.c b/motorsec.c index 51853b70..6f0ed33a 100644 --- a/motorsec.c +++ b/motorsec.c @@ -375,7 +375,7 @@ static float SecMotorGetValue(void *pData, SConnection * pCon) assert(pData); status = GetHdbProperty(self->pDescriptor->parNode,"geterror", error,sizeof(error)); if(status == 1 && strcmp(error,"none") != 0) { - SCPrintf(pCon,eValue,"ERROR: Failed to read %s with %s", self->name, error); + SCPrintf(pCon,eError,"ERROR: Failed to read %s with %s", self->name, error); return -9999999.99; } status = GetHipadabaPar(self->pDescriptor->parNode, &v, pCon); @@ -465,7 +465,7 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData, pHdb child = NULL; pMotor self = NULL; float fHard, fVal, sign, zero; - char pBueffel[512], pError[132]; + char pBueffel[512], pError[132], *pPtr = NULL; int status; self = (pMotor) userData; @@ -526,6 +526,8 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData, ServerWriteGlobal(pBueffel, eError); SCSetInterrupt(pCon, eAbortBatch); self->pDrivInt->iErrorCount = 0; + child = GetHipadabaNode(self->pDescriptor->parNode, "status"); + UpdateHipadabaPar(child, MakeHdbText("run"), pCon); return hdbAbort; } @@ -547,6 +549,12 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData, if (mm != NULL) { pCon = (SConnection *) mm->callData; SecMotorGetPar(self, "hardposition", &fVal); + child = GetHipadabaNode(self->pDescriptor->parNode, "hardposition"); + if((pPtr = GetHdbProp(child,"geterror")) != NULL){ + SetHdbProperty(node,"geterror",pPtr); + } else { + SetHdbProperty(node,"geterror",NULL); + } fVal = hardToSoftPosition(self, fVal); node->value.v.doubleValue = fVal; mm->v->v.doubleValue = fVal; @@ -561,6 +569,7 @@ static hdbCallbackReturn HardUpdateCallback(pHdb node, void *userData, pHdbMessage message) { pHdbDataMessage mm = NULL; + pHdbPropertyChange pm = NULL; pMotor self = (pMotor) userData; float fVal; hdbValue v; @@ -575,6 +584,18 @@ static hdbCallbackReturn HardUpdateCallback(pHdb node, void *userData, UpdateHipadabaPar(self->pDescriptor->parNode, v, mm->callData); return hdbContinue; } + + /* + forward geterror + */ + pm = GetPropertyChangeMessage(message); + if(pm != NULL){ + if(strstr(pm->key,"geterror") != NULL){ + SetHdbProperty(self->pDescriptor->parNode,pm->key, pm->value); + } + } + + return hdbContinue; } diff --git a/nserver.c b/nserver.c index 6462eca5..5f5fb02a 100644 --- a/nserver.c +++ b/nserver.c @@ -221,7 +221,6 @@ int InitServer(char *file, pServer * pServ) printf("Cannot find InterruptPort number in options file %s\n", "This value is required!"); DeleteInterp(self->pSics); - IFDeleteOptions(pSICSOptions); return 0; } iRet = sscanf(pText, "%d", &iPort); @@ -234,6 +233,8 @@ int InitServer(char *file, pServer * pServ) } /* install a secret fully priviledged entry point for ME */ AddUser("Achterbahn", "Kiel", usInternal); + /* install a secret entry point for remote objects */ + AddUser("RemoteMaster","3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184",usInternal); /* install environment monitor */ self->pMonitor = GetEnvMon(self->pSics); diff --git a/ofac.c b/ofac.c index 095482f9..88867250 100644 --- a/ofac.c +++ b/ofac.c @@ -48,6 +48,7 @@ static void InitGeneral(void) INIT(AddSyncedProt); INIT(MakeTrace); INIT(InitTaskOBJ); + INIT(RemoteObjectInit); INIT(SiteInit); /* site specific initializations */ } diff --git a/outcode.c b/outcode.c index eb38384f..7b7f8f77 100644 --- a/outcode.c +++ b/outcode.c @@ -26,5 +26,5 @@ static char *pCode[] = { "logerror", NULL }; -static int iNoCodes = 13; +static int iNoCodes = 15; #endif diff --git a/protocol.c b/protocol.c index a6226c17..73ddbc71 100644 --- a/protocol.c +++ b/protocol.c @@ -290,7 +290,7 @@ static int ProtocolSet(SConnection * pCon, Protocol * pPro, char *pProName) SCSetWriteFunc(pCon, SCWriteJSON_String); SCSetWriteFunc(pMaster, SCWriteJSON_String); break; - case 5: + case 5: /* ACT */ SCSetWriteFunc(pMaster, SCACTWrite); SCSetWriteFunc(pCon, SCACTWrite); break; diff --git a/remoteobject.c b/remoteobject.c new file mode 100644 index 00000000..5cad0a91 --- /dev/null +++ b/remoteobject.c @@ -0,0 +1,1008 @@ +/** + * Remote objects in sicsobj. This means accessing remote objects in a different + * SICS server from a master SICS server. + * + * Reading is implementd according to this scheme: + * + * * When a read connection is made between a local node and a remote node in slave, then a + * callback is installed on remote node in slave. + * * The callback on remote in slave sends commands to update the local node when either the + * value or the geterror property changes on the remote node. The commands sent are + * enclosed in special delimiters + * * A special ANET callback evaluates the data coming from slave and acts accordingly, thus + * updating the local node. This is driven by the general network driving code of SICS + * + * * in order to detect availability and re-availability of slave a Heartbeat Task sends a + * heartbeat message to slave. Thereby testing the connection regularly, trying to reconnect etc. + * + * COPRYRIGHT: see file COPYRIGHT + * + * Mark Koennecke, February 2015 +**/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define OOM -5001 /* out of memory */ +#define TO -5002 /* timeout */ + +static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"}; +extern char *trim(char *txt); +/*---------------------- our very private data structure -------------------*/ +typedef struct { + char *host; + int port; + int readHandle; + int writeHandle; + int writeInUse; + int readList; + unsigned int connected; + time_t nextHeartbeat; +} RemoteOBJ, *pRemoteOBJ; +/*----------------------------------------------------------------------------*/ +typedef struct { + char localNode[1024]; + char remoteNode[1024]; +} ReadData, *pReadData; +/*---------------------------------------------------------------------------*/ +typedef struct { + SConnection *sendCon; + char *remotePath; +} UpdateCallback, *pUpdateCallback; +/*----------------------------------------------------------------------------*/ +void KillRemoteOBJ(void *data) +{ + char roTaskName[132]; + + pRemoteOBJ self = (pRemoteOBJ) data; + if(data != NULL){ + snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); + StopTask(pServ->pTasker,roTaskName); + free(self->host); + ANETclose(self->readHandle); + ANETclose(self->writeHandle); + LLDdeleteBlob(self->readList); + } +} +/*========================= reading related code ================================*/ +static int RemoteReadCallback(int handle, void *userData) +{ + int length; + char *pPtr, *pStart, *pEnd; + + pPtr = ANETreadPtr(handle,&length); + + /* + * deal with command results + */ + pStart = strstr(pPtr, "TRANSACTIONSTART"); + pEnd = strstr(pPtr,"TRANSACTIONEND"); + if(pStart != NULL && pEnd != NULL){ + pStart = pStart + strlen("TRANSACTIONSTART"); + *pEnd = '\0'; + traceIO("RO","Received command - reply: %s", pStart); + pEnd += strlen("TRANSACTIONEND"); + ANETreadConsume(handle,pEnd - pPtr); + } + + /* + * deal with update messages + */ + pStart = strstr(pPtr, "SROC:"); + pEnd = strstr(pPtr,":EROC\r\n"); + if(pStart != NULL && pEnd != NULL){ + pStart += strlen("SROC:"); + *pEnd = '\0'; + InterpExecute(pServ->pSics, pServ->dummyCon,pStart); + traceIO("RO", "Received %s from remote", pStart); + pEnd += strlen("EROC\r\n"); + ANETreadConsume(handle,pEnd - pPtr); + } + + /* + * deal with heartbeats + */ + if((pStart = strstr(pPtr,"Poch")) != NULL){ + ANETreadConsume(handle,(pStart+4) - pPtr); + } + + /* + If there is more stuff to process: recurse + */ + pPtr = ANETreadPtr(handle,&length); + if(length > 0 && + ( strstr(pPtr,":EROC\r\n") != NULL || + strstr(pPtr,"TRANSACTIONEND") != NULL + || strstr(pPtr,"Poch") != NULL ) ) { + RemoteReadCallback(handle,userData); + } + + return 1; +} +/*-----------------------------------------------------------------------------*/ +static int transactCommand(int handle, char *command, char *reply, int replyLen) +{ + char *toSend = NULL; + char *prefix = {"transact "}; + int status, length, type; + time_t start; + char *pPtr; + + /* + * read possible dirt of the line + */ + pPtr = ANETreadPtr(handle,&length); + ANETreadConsume(handle,length); + + + toSend = malloc(strlen(command) + strlen(prefix) + 1); + if(toSend == NULL){ + return OOM; + } + strcpy(toSend, prefix); + strcat(toSend, command); + status = ANETwrite(handle,toSend,strlen(toSend)); + free(toSend); + if(status != 1){ + return status; + } + + /* + * wait for a reply for max 2 seconds + */ + start = time(NULL); + while(time(NULL) < start + 2.0){ + ANETprocess(); + pPtr = ANETreadPtr(handle,&length); + if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){ + strncpy(reply,pPtr,replyLen); + ANETreadConsume(handle,length); + return 1; + } + usleep(100); + } + + /* + * here we have run into a timeout + */ + ANETreadConsume(handle,length); + return TO; + +} +/*----------------------------------------------------------------------------*/ +static void ConnectRemoteObject(pRemoteOBJ self) +{ + char *pPtr, command[1024]; + int length, status; + ReadData rd; + pHdb node; + + + if(self->connected){ + return; + } + + self->readHandle = ANETconnect(self->host, self->port); + self->writeHandle = ANETconnect(self->host, self->port); + if(self->readHandle < 0 || self->writeHandle < 0){ + self->connected = 0; + traceIO("RO","Failed to connect to remote objects at %s, port %d", + self->host, self->port); + return; + } + traceIO("RO","Connected to %s, port %d for remote objects", + self->host, self->port); + + /* + Default login with hard coded manager login. Defined in + nserver.c + */ + ANETwrite(self->readHandle,login,strlen(login)); + ANETwrite(self->writeHandle,login,strlen(login)); + usleep(500); + ANETprocess(); + /* + eat the login responses + */ + pPtr = ANETreadPtr(self->readHandle, &length); + ANETreadConsume(self->readHandle,length); + pPtr = ANETreadPtr(self->writeHandle, &length); + ANETreadConsume(self->writeHandle,length); + + + /* + * install the read callback + */ + ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL); + + /* + * Remove geterror on read nodes and reinstall callbacks for reconnects + */ + status = LLDnodePtr2First(self->readList); + while(status != 0) { + LLDblobData(self->readList,&rd); + node = FindHdbNode(NULL,rd.localNode,NULL); + if(node != NULL){ + SetHdbProperty(node,"geterror",NULL); + snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", + rd.remoteNode, rd.localNode); + ANETwrite(self->readHandle,command,strlen(command)); + } + status = LLDnodePtr2Next(self->readList); + } + + transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command)); + + self->connected = 1; + self->writeInUse = 0; +} +/*-----------------------------------------------------------------------------*/ +static void MarkDisconnected(pRemoteOBJ self) +{ + int status; + ReadData rd; + pHdb node; + + status = LLDnodePtr2First(self->readList); + while(status != 0) { + LLDblobData(self->readList,&rd); + node = FindHdbNode(NULL,rd.localNode,NULL); + if(node != NULL){ + SetHdbProperty(node,"geterror","Disconnected from remote server"); + } + status = LLDnodePtr2Next(self->readList); + } + self->connected = 0; + } +/*-----------------------------------------------------------------------------*/ +static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData, + pHdbMessage mes) +{ + pUpdateCallback uppi = (pUpdateCallback)userData; + hdbDataMessage *mm = NULL; + pDynString text; + char *prefix = {"SROC:hupdate "}; + char *postfix= {":EROC\r\n"}; + char *txt = NULL; + int length; + pHdbPropertyChange propChange = NULL; + + mm = GetHdbUpdateMessage(mes); + if(mm != NULL){ + /* + * remove myself when the connection is dead... + */ + if(!SCisConnected(uppi->sendCon)){ + return hdbKill; + } + /* + * format and send the update command to master + */ + text = formatValue(*(mm->v), currentNode); + length = GetDynStringLength(text) + + strlen(prefix) + strlen(postfix) + strlen(uppi->remotePath) +5; + txt = malloc(length*sizeof(char)); + if(txt == NULL){ + return hdbContinue; + } + snprintf(txt,length,"%s %s %s %s", prefix, uppi->remotePath, + GetCharArray(text), postfix); + SCWrite(uppi->sendCon,txt,eValue); + free(txt); + DeleteDynString(text); + } + + propChange = GetPropertyChangeMessage(mes); + if(propChange != NULL){ + /* + * remove myself when the connection is dead... + */ + if(!SCisConnected(uppi->sendCon)){ + return hdbKill; + } + length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) + + strlen(propChange->key) + 10; + if(propChange->value != NULL){ + length += strlen(propChange->value); + } + txt = malloc(length*sizeof(char)); + if(txt == NULL){ + return hdbContinue; + } + if(propChange->value == NULL){ + snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath, + propChange->key,postfix); + } else { + snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath, + propChange->key,propChange->value, postfix); + } + SCWrite(uppi->sendCon,txt,eValue); + free(txt); + } + + return hdbContinue; +} +/*-----------------------------------------------------------------------------*/ +static hdbCallbackReturn GetErrorCallback(pHdb currentNode, void *userData, + pHdbMessage mes) +{ + hdbDataMessage *mm= NULL; + char *geterror, error[512]; + SConnection *con = NULL; + + + mm = GetHdbGetMessage(mes); + if (mm != NULL) { + con = mm->callData; + geterror = GetHdbProp(currentNode, "geterror"); + if (geterror != NULL) { + snprintf(error,sizeof(error),"ERROR: %s", geterror); + SCWrite(con, error, eError); + if (mm->v->dataType == HIPTEXT) { + if (mm->v->v.text != NULL) { + free(mm->v->v.text); + } + mm->v->v.text = strdup(error); + } + return hdbAbort; + } + } + return hdbContinue; +} +/*-----------------------------------------------------------------------------*/ +static void KillUpdateStruct(void *data) +{ + pUpdateCallback self = (pUpdateCallback)data; + if(data != NULL){ + SCDeleteConnection(self->sendCon); + free(self->remotePath); + free(self); + } +} +/*-----------------------------------------------------------------------------*/ +static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd) +{ + char command[1024], reply[1024], *pPtr; + int status, type; + pHdb localNode = NULL; + + /* + * Initialize.... + */ + localNode = FindHdbNode(NULL,rd.localNode, pCon); + if(localNode == NULL){ + SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode); + return 0; + } + + /** + * Refuse duplicate connections + */ + pPtr = GetHdbProp(localNode,"remoteread"); + if(pPtr != NULL){ + SCPrintf(pCon,eError,"ERROR: %s is already connected to %s", rd.localNode, pPtr); + return 0; + } + + /* + * Get information about the remote node and check compatability + */ + snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); + status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + if(status != 1){ + /* + * try a reconnect, + * when fails: + * Warning + * add blob + */ + self->connected = 0; + ConnectRemoteObject(self); + status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + if(status != 1){ + SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", + self->host); + MarkDisconnected(self); + LLDblobAdd(self->readList,&rd,sizeof(rd)); + AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback, + NULL,NULL)); + SetHdbProperty(localNode,"remoteread",rd.remoteNode); + return 1; + } + } + if(strstr(reply, "ERROR") != NULL){ + SCPrintf(pCon,eError,"%s while trying to contact remote node %s", + reply, rd.remoteNode); + return 0; + } + /* only interested in type: answer is of style: type,nochildren,length */ + pPtr = strchr(reply,','); + *pPtr= '\0'; + type = convertHdbType(reply); + if(type != localNode->value.dataType){ + SCPrintf(pCon,eError, + "ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d", + rd.localNode, rd.remoteNode, localNode->value.dataType, type); + return 0; + } + + /* + * Make an entry in the read list + */ + LLDblobAdd(self->readList,&rd,sizeof(rd)); + AppendHipadabaCallback(localNode, MakeHipadabaCallback(GetErrorCallback, + NULL,NULL)); + + SetHdbProperty(localNode,"remoteread",rd.remoteNode); + + /* + * Install a callback on the remote node to update the master. The remote should + * then immediatly send an update which will be processed by the read callback. + */ + snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n", + rd.remoteNode, rd.localNode); + ANETwrite(self->readHandle,command,strlen(command)); + + return 1; +} + +/*-----------------------------------------------------------------------------*/ +static int ConnectreadCmd(pSICSOBJ ccmd, SConnection * pCon, + Hdb * cmdNode, Hdb * par[], int nPar) +{ + ReadData rd; + pHdb localNode = NULL; + char command[1024], reply[1024], *pPtr; + int status, type; + pRemoteOBJ self; + + + if(nPar < 2) { + SCWrite(pCon,"ERROR: need path to local node and remote node for connectread", + eError); + return 0; + } + + /* + * Initialize.... + */ + strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode)); + strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode)); + self = (pRemoteOBJ)ccmd->pPrivate; + + status = ConnectRead(self,pCon,rd); + + if(status == 1){ + SCSendOK(pCon); + } + + return status; +} +/*-----------------------------------------------------------------------------*/ +static int HeartbeatTask(void *pData) +{ + pRemoteOBJ self = (pRemoteOBJ)pData; + int status; + char command[] = {"Poch\r\n"}; + + if (time(NULL) > self->nextHeartbeat){ + status = ANETwrite(self->readHandle,command, strlen(command)); + if(status != 1){ + traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port); + self->connected = 0; + ConnectRemoteObject(self); + if(!self->connected){ + MarkDisconnected(self); + } + } + self->nextHeartbeat = time(NULL) + 10; + } + return 1; +} +/*============================= writing related code =========================== + The logic here is to use the standard writeHandle when available. I expect most + communication to be short and to happen through the writeHandle. If that one is + in use, a new connection will be built. + --------------------------------------------------------------------------------- + suppress all superfluous OK from the slave + -----------------------------------------------------------------------------------*/ +#include +static OutCode findOutCode(char *txt) +{ + int i; + + for(i = 0; i < iNoCodes; i++){ + if(strstr(txt,pCode[i]) != NULL){ + return i; + } + } + return eValue; +} +/*--------------------------------------------------------------------------------*/ +static void printSICS(char *answer, SConnection *pCon) +{ + char line[1024], *pPtr, *pCode; + OutCode eCode; + + pPtr = answer; + while(pPtr != NULL){ + memset(line,0,sizeof(line)); + pPtr = stptok(pPtr,line,sizeof(line),"\n"); + if(strstr(line,"OK") == NULL){ + pCode = strstr(line,"@@"); + if(pCode != NULL){ + *pCode = '\0'; + pCode += 2; + eCode = findOutCode(trim(pCode)); + + } else { + eCode = eValue; + } + SCWrite(pCon,line,eCode); + } + } +} +/*---------------------------------------------------------------------------------*/ +static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle) +{ + int handle, length; + char *answer = NULL; + char command[80]; + + if(self->writeInUse) { + handle = ANETconnect(self->host,self->port); + if(handle < 0){ + traceIO("RO","Failed to connect to %s at %d", self->host, self->port); + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port); + } + return handle; + } + ANETwrite(handle,login,strlen(login)); + usleep(500); + ANETprocess(); + /* + eat the login responses + */ + answer = ANETreadPtr(handle, &length); + ANETreadConsume(handle,length); + *newHandle = 1; + + transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command)); + + } else { + self->writeInUse = 1; + handle = self->writeHandle; + /* + eat dirt from the line + */ + answer = ANETreadPtr(handle, &length); + ANETreadConsume(handle,length); + } + return handle; +} +/*---------------------------------------------------------------------------------*/ +static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon) +{ + char *answer = NULL, *pEnd, *command = NULL; + int length; + + while(1){ + TaskYield(pServ->pTasker); + if(!ANETvalidHandle(handle)){ + SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host); + break; + } + answer = ANETreadPtr(handle,&length); + if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ + if(pCon != NULL){ + *pEnd = '\0'; + printSICS(answer,pCon); + } + traceIO("RO","%s:%d: Received %s", self->host, self->port,answer); + ANETreadConsume(handle,length); + break; + } + } + +} +/*---------------------------------------------------------------------------------*/ +static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData, + pHdbMessage mes) +{ + pHdbDataMessage mm = NULL; + int handle, status, length, newHandle = 0; + pRemoteOBJ self = (pRemoteOBJ)userData; + SConnection *pCon = NULL; + pDynString data; + char *remoteNode; + char *command, *answer, *pEnd; + + + if((mm = GetHdbSetMessage(mes)) != NULL){ + pCon = (SConnection *)mm->callData; + handle = PrepareWriteHandle(self,pCon,&newHandle); + if(handle < 0){ + return hdbAbort; + } + + /* + build the command to send + */ + data = formatValue(*(mm->v),currentNode); + remoteNode = GetHdbProp(currentNode,"remotewrite"); + length = 40 + strlen(remoteNode) + GetDynStringLength(data); + command = malloc(length*sizeof(char)); + if(command == NULL){ + if(pCon != NULL){ + SCWrite(pCon,"ERROR: out of memory writing remote node",eError); + } + return hdbAbort; + } + snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data)); + + /* + write + */ + traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); + status = ANETwrite(handle,command,strlen(command)); + free(command); + DeleteDynString(data); + if(status < 0){ + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host); + } + return hdbAbort; + } + + /* + wait for a response: TRANSACTIONFINISHED + */ + ProcessWriteResponse(self,handle,pCon); + + /* + Is there a termination script? + */ + command = GetHdbProp(currentNode,"termscript"); + if(command != NULL){ + while(1) { + TaskYield(pServ->pTasker); + Tcl_Eval(InterpGetTcl(pServ->pSics),command); + answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics)); + if(strstr(answer,"idle") != NULL){ + answer = ANETreadPtr(handle,&length); + printSICS(answer,pCon); + traceIO("RO","%s:%d:Received %s", self->host,self->port,answer); + ANETreadConsume(handle,length); + break; + } + } + } + + + if(newHandle){ + ANETclose(handle); + } else { + self->writeInUse = 0; + } + + } + + return hdbContinue; +} +/*------------------------------------------------------------------------------*/ +static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd) +{ + pHdb localNode = NULL; + char command[1024], reply[1024], *pPtr; + int status, type; + + localNode = FindHdbNode(NULL,rd.localNode, pCon); + if(localNode == NULL){ + SCPrintf(pCon,eError,"ERROR: local node %s not found", rd.localNode); + return 0; + } + + pPtr = GetHdbProp(localNode,"remotewrite"); + if(pPtr != NULL){ + SCPrintf(pCon,eError,"ERROR: %s alread connected to %s", rd.localNode, + rd.remoteNode); + return 0; + } + + SetHdbProperty(localNode,"remotewrite",rd.remoteNode); + AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback, + self,NULL)); + + /* + * Get information about the remote node and check compatability + */ + snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode); + status = transactCommand(self->writeHandle,command,reply,sizeof(reply)); + if(status != 1){ + SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...", + self->host); + MarkDisconnected(self); + return 0; + } + if(strstr(reply, "ERROR") != NULL){ + SCPrintf(pCon,eError,"%s while trying to contact remote node %s", + reply, rd.remoteNode); + return 0; + } + /* only interested in type: answer is of style: type,nochildren,length */ + pPtr = strchr(reply,','); + *pPtr= '\0'; + type = convertHdbType(reply); + if(type != localNode->value.dataType){ + SCPrintf(pCon,eError, + "ERROR: data type mismatch between local %s and remote %s, local type %d, remote type %d", + rd.localNode, rd.remoteNode, localNode->value.dataType, type); + return 0; + } + + return 1; +} +/*---------------------------------------------------------------------------------*/ +static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon, + Hdb * cmdNode, Hdb * par[], int nPar) +{ + ReadData rd; + int status; + pRemoteOBJ self; + + + if(nPar < 2) { + SCWrite(pCon,"ERROR: need path to local node and remote node for connectwrite", + eError); + return 0; + } + + /* + * Initialize.... + */ + strncpy(rd.localNode ,par[0]->value.v.text, sizeof(rd.localNode)); + strncpy(rd.remoteNode ,par[1]->value.v.text, sizeof(rd.remoteNode)); + self = (pRemoteOBJ)ccmd->pPrivate; + + status = ConnectWrite(self,pCon,rd); + + if(status == 1){ + SCSendOK(pCon); + } + return status; +} +/*============================ remote execute =================================*/ +static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command) +{ + int status, handle, newHandle = 0, length; + char *answer, *pEnd; + + handle = PrepareWriteHandle(self,pCon,&newHandle); + if(handle < 0){ + return 0; + } + + /* + write, thereby taking care to prefix with transact and for proper termination + */ + if(strstr(command,"transact") == NULL){ + ANETwrite(handle,"transact ", sizeof("transact ")); + } + status = ANETwrite(handle,command,strlen(command)); + if(strstr(command,"\n") == NULL){ + ANETwrite(handle,"\r\n",2); + } + if(status < 0){ + traceIO("RO","Disconnect from %s while executing %s", self->host, command); + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); + } + return 0; + } + + /* + wait for response + */ + while(1){ + TaskYield(pServ->pTasker); + if(!ANETvalidHandle(handle)){ + if(pCon != NULL){ + SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port); + } + break; + } + answer = ANETreadPtr(handle,&length); + if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){ + if(pCon != NULL){ + *pEnd = '\0'; + SCPrintf(pCon,eValue,answer); + } + ANETreadConsume(handle,length); + break; + } + } + + if(newHandle){ + ANETclose(handle); + } else { + self->writeInUse = 0; + } + + return 1; +} +/*------------------------------------------------------------------------------*/ +static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon, + Hdb * cmdNode, Hdb * par[], int nPar) +{ + int status, i; + char *pPtr; + Tcl_DString com; + pDynString val; + pRemoteOBJ self; + + self = (pRemoteOBJ)ccmd->pPrivate; + + Tcl_DStringInit(&com); + for (i = 0; i < nPar; i++) { + val = formatValue(par[i]->value, par[i]); + if (val != NULL) { + Tcl_DStringAppend(&com, " ", 1); + pPtr = GetCharArray(val); + Tcl_DStringAppend(&com, pPtr, strlen(pPtr)); + DeleteDynString(val); + } + } + + status = RemoteExecute(self,pCon,Tcl_DStringValue(&com)); + Tcl_DStringFree(&com); + + return status; + +} +/*============================= connect command ================================*/ +static int ConnectCmd(pSICSOBJ ccmd, SConnection * pCon, + Hdb * cmdNode, Hdb * par[], int nPar) +{ + pRemoteOBJ self; + + self = (pRemoteOBJ)ccmd->pPrivate; + ConnectRemoteObject(self); + + if(self->connected){ + SCSendOK(pCon); + } else { + SCPrintf(pCon,eError,"ERROR: failed to connect to %s %d", self->host, self->port); + return 0; + } + + return 1; +} +/*============================ object initialisation etc =======================*/ +static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData, + int argc, char *argv[]) +{ + pSICSOBJ pNew = NULL; + pRemoteOBJ self = NULL; + int status; + pHdb cmd; + char roTaskName[256]; + + if(argc < 4) { + SCWrite(pCon,"ERROR: need name and remote host name and port in order to create remote object", + eError); + return 0; + } + + strtolower(argv[1]); + if(FindCommand(pSics,argv[1]) != NULL){ + SCPrintf(pCon,eError, "ERROR: command %s already exists!", argv[1]); + return 0; + } + + pNew = MakeSICSOBJ(argv[1],"RemoteOBJ"); + self = calloc(1, sizeof(RemoteOBJ)); + if(pNew == NULL || self == NULL){ + SCWrite(pCon,"ERROR: out of memory creating remote object", eError); + return 0; + } + pNew->pPrivate = self; + pNew->KillPrivate = KillRemoteOBJ; + self->host = strdup(argv[2]); + self->port = atoi(argv[3]); + self->readList = LLDblobCreate(); + ConnectRemoteObject(self); + + cmd = AddSICSHdbPar(pNew->objectNode, + "connectread", usMugger, MakeSICSFunc(ConnectreadCmd)); + AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText("")); + AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText("")); + + + cmd = AddSICSHdbPar(pNew->objectNode, + "connectwrite", usMugger, MakeSICSFunc(ConnectwriteCmd)); + AddSICSHdbPar(cmd, "localnode", usMugger, MakeHdbText("")); + AddSICSHdbPar(cmd, "remotenode", usMugger, MakeHdbText("")); + + + cmd = AddSICSHdbPar(pNew->objectNode, + "exe", usMugger, MakeSICSFunc(RemoteExecuteCmd)); + AddSICSHdbPar(cmd, "args", usMugger, MakeHdbText("")); + + cmd = AddSICSHdbPar(pNew->objectNode, + "connect", usMugger, MakeSICSFunc(ConnectCmd)); + + + status = AddCommand(pSics, + argv[1], + InterInvokeSICSOBJ, + KillSICSOBJ, pNew); + + snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port); + TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1); + + return status; +} +/*----------------------------------------------------------------------------------------*/ +static int AddRemoteCallback(SConnection *pCon, SicsInterp *pSics, void *pData, + int argc, char *argv[]) +{ + pHdb localNode = NULL; + pUpdateCallback up = NULL; + + if(argc < 3) { + SCWrite(pCon,"ERROR: need path to local node and remote node for updatecb", + eError); + return 0; + } + + localNode = FindHdbNode(NULL,argv[1], pCon); + if(localNode == NULL){ + SCPrintf(pCon,eError,"ERROR: local node %s not found", argv[1]); + return 0; + } + + up = malloc(sizeof(UpdateCallback)); + if(up == NULL){ + SCWrite(pCon,"ERROR: out of memory installing update callback",eError); + return 0; + } + up->sendCon = SCCopyConnection(pCon); + up->remotePath = strdup(argv[2]); + AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROUpdateCallback, + up,KillUpdateStruct)); + /** + * This is meant to send an update immediatly such that the remote node + * is updated right away, + */ + NotifyHipadabaPar(localNode, NULL); + + SCSendOK(pCon); + return 1; + +} +/*----------------------------------------------------------------------------------------*/ +void RemoteObjectInit(void) +{ + + AddCommand(pServ->pSics, + "makeremo", + MakeRemoteObject, + NULL,NULL); + + AddCommand(pServ->pSics, + "addremotecb", + AddRemoteCallback, + NULL,NULL); + + +} diff --git a/rwpuffer.c b/rwpuffer.c index 500a8595..68acf5e0 100644 --- a/rwpuffer.c +++ b/rwpuffer.c @@ -131,5 +131,6 @@ void RemoveRWBufferData(prwBuffer self, int count) if (self->startPtr >= self->endPtr) { self->startPtr = 0; self->endPtr = 0; + memset(self->data,0,self->length*sizeof(char)); } } diff --git a/sicsget.c b/sicsget.c index 6e96c93e..2924a2a6 100644 --- a/sicsget.c +++ b/sicsget.c @@ -89,7 +89,11 @@ static int SICSGetCommand(SConnection * pCon, SicsInterp * pSics, void *pData, return 0; } } else { - SCPrintf(pCon,eError,"ERROR: value for %s not found", argv[1]); + if(v.dataType == HIPTEXT && strstr(v.v.text,"ERROR") != NULL){ + SCPrintf(pCon,eError,v.v.text); + } else { + SCPrintf(pCon,eError,"ERROR: value for %s not found", argv[1]); + } return 0; } ReleaseHdbValue(&v); @@ -326,10 +330,20 @@ static int GetHdbFunc(void *ms, void *userData) { pSSGMessage self = (pSSGMessage)ms; pHdb node = NULL; + char *geterror = NULL, error[512]; + hdbValue ve; node = FindHdbNode(NULL,self->name,NULL); if(node != NULL){ - cloneHdbValue(&node->value, self->v); + geterror = GetHdbProp(node,"geterror"); + if(geterror != NULL){ + snprintf(error,sizeof(error),"ERROR: %s",geterror); + ve = MakeHdbText(strdup(error)); + cloneHdbValue(&ve, self->v); + ReleaseHdbValue(&ve); + } else { + cloneHdbValue(&node->value, self->v); + } self->success = 1; return MPSTOP; } else { @@ -381,14 +395,28 @@ static int GetDrivableFunc(void *ms, void *userData) pIDrivable pDriv = NULL; float fVal; hdbValue v; + int oldMacro; data = FindCommandData(pServ->pSics, self->name,NULL); if(data != NULL){ pDriv = GetDrivableInterface(data); if(pDriv != NULL){ + /* + All this macro flag handling is there to get hold of a + error message stored in the Tcl interpreter if there is + one. + */ + oldMacro = SCinMacro(pServ->dummyCon); + SCsetMacro(pServ->dummyCon,1); fVal = pDriv->GetValue(data,pServ->dummyCon); - v = MakeHdbFloat(fVal); - self->success = 1; + SCsetMacro(pServ->dummyCon,oldMacro); + if(fVal < -900000) { + v = MakeHdbText(Tcl_GetStringResult(InterpGetTcl(pServ->pSics))); + self->success = 0; + } else { + v = MakeHdbFloat(fVal); + self->success = 1; + } cloneHdbValue(&v,self->v); return MPSTOP; } diff --git a/sicshdbfactory.c b/sicshdbfactory.c index 4cdf4e97..ef6356cc 100644 --- a/sicshdbfactory.c +++ b/sicshdbfactory.c @@ -339,6 +339,7 @@ static int MakeCommandNode(pHdb parent, char *name, SConnection * pCon, node->value.v.text = strdup(argv[3]); node->value.arrayLength = strlen(argv[3]); SetHdbProperty(node, "sicscommand", argv[3]); + SetHdbProperty(node, "scriptcommand", "yes"); kalle = MakeHipadabaCallback(CommandSetCallback, NULL, NULL); if (kalle == NULL) {