Merge branch 'develop' of ssh://gitorious.psi.ch/sinqdev/sics into develop

This commit is contained in:
2015-06-11 16:26:57 +02:00
15 changed files with 355 additions and 324 deletions

View File

@ -581,6 +581,7 @@ void *ANETreadPtr(int handle, int *length)
con = findSocketDescriptor(handle);
if (con == NULL) {
*length = 0;
return NULL;
} else {
data = GetRWBufferData(con->readBuffer, length);

View File

@ -74,6 +74,15 @@
#include "sicshipadaba.h"
#include "protocol.h"
#include "sicsvar.h"
#include <json/json.h>
/*
Greetings from protocol.c for SCLogWrite...
*/
extern struct json_object *mkJSON_Object(SConnection * pCon, char *pBuffer,
int iOut);
/*
#define UUDEB 1
define UUDEB , for buffer writing for checking encoding */
@ -255,6 +264,7 @@ static SConnection *CreateConnection(SicsInterp * pSics)
pRes->conStart = time(NULL);
pRes->write = SCNormalWrite;
pRes->runLevel = RUNDRIVE;
pRes->remote = 0;
/* initialise context variables */
pRes->iCmdCtr = 0;
@ -487,6 +497,7 @@ SConnection *SCCopyConnection(SConnection * pCon)
result->iList = -1;
result->runLevel = pCon->runLevel;
result->data = pCon->data;
result->remote = pCon->remote;
return result;
}
@ -1080,13 +1091,14 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
{
char pBueffel[1024];
char *pPtr;
json_object *myJson = NULL;
/* for commandlog tail */
if (!VerifyConnection(self)) {
return 0;
}
if(self->iProtocolID == 5) {
if(self->iProtocolID == PROTACT) { /* act */
if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20);
@ -1098,6 +1110,12 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){
free(pPtr);
}
} else if(self->iProtocolID == PROTJSON) {
myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));
json_object_put(myJson);
}
} else {
testAndWriteSocket(self, buffer, iOut);
}
@ -1111,6 +1129,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
{
char pBueffel[1024];
char *pPtr;
json_object *myJson = NULL;
if (!VerifyConnection(self)) {
return 0;
@ -1119,7 +1138,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
WriteToCommandLogId(NULL, self->sockHandle, buffer);
SetSendingConnection(NULL);
if(self->iProtocolID == 5) {
if(self->iProtocolID == PROTACT) { /* act */
if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20);
@ -1131,7 +1150,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){
free(pPtr);
}
} else if(self->iProtocolID == 2) {
} else if(self->iProtocolID == PROTCODE) { /* withcode */
if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20);
@ -1143,6 +1162,12 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){
free(pPtr);
}
} else if(self->iProtocolID == PROTJSON) { /* json */
myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));
json_object_put(myJson);
}
} else {
testAndWriteSocket(self, buffer, iOut);
}
@ -1323,7 +1348,7 @@ int SCWriteZipped(SConnection * self, char *pName, void *pData,
memset(outBuf, 0, 65536);
protocolID = GetProtocolID(self);
if (protocolID == 5) {
if (protocolID == PROTACT) {
cc = SCGetContext(self);
sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName,
compressedLength, cc.transID);
@ -1398,7 +1423,7 @@ int SCWriteBinary(SConnection * self, char *pName, void *pData,
memset(outBuf, 0, 65536);
protocolID = GetProtocolID(self);
if (protocolID == 5) {
if (protocolID == PROTACT) {
cc = SCGetContext(self);
sprintf(outBuf, "SICSBIN BIN %s %d %d\r\n", pName,
iDataLen, cc.transID);
@ -1504,7 +1529,7 @@ int SCWriteZippedOld(SConnection * self, char *pName, void *pData,
memset(outBuf, 0, 65536);
protocolID = GetProtocolID(self);
if (protocolID == 5) {
if (protocolID == PROTACT) {
cc = SCGetContext(self);
sprintf(outBuf, "SICSBIN ZIP %s %d %d\r\n", pName,
compressedLength, cc.transID);
@ -1830,7 +1855,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand)
memset(pBueffel, 0, 80);
stptok(trim(pCommand), pBueffel, 79, " ");
self->iCmdCtr++;
if (999999 < self->iCmdCtr) {
if (self->iCmdCtr > 99998) {
self->iCmdCtr = 0;
}
self->transID = self->iCmdCtr;
@ -1856,6 +1881,7 @@ int SCInvoke(SConnection * self, SicsInterp * pInter, char *pCommand)
config File Filename Logs to another file
config output normal | withcode | ACT Sets output mode
config listen 0 | 1 enables commandlog listen mode
config remote sets the remote connection flag
---------------------------------------------------------------------------*/
int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData,
@ -1913,7 +1939,11 @@ int ConfigCon(SConnection * pCon, SicsInterp * pSics, void *pData,
SCSendOK(pCon);
return 1;
}
}
} else if(strcmp(argv[1],"remote") == 0) {
pMaster->remote = 1;
pCon->remote = 1;
return 1;
}
/* check no or args */
if (argc < 3) {

View File

@ -71,6 +71,7 @@ typedef struct __SConnection {
pCosta pStack; /* stack of pending commands */
int contextStack; /* context stack: may go? */
mkChannel *pSock; /* for temporary backwards compatability */
int remote; /* true if this is a remote object connection */
} SConnection;
#include "nserver.h"

View File

@ -184,13 +184,17 @@ static int SecCtrCheckStatus(void *pData, SConnection *pCon)
fControl = v.v.doubleValue;
} else {
node = GetHipadabaNode(self->pDes->parNode,"values");
assert(node != NULL);
/*
The 1 below is only correct for PSI where only the first
monitor can be the control monitor. Elsewhere this must be the
control monitor channel
*/
fControl = v.v.intArray[1];
if(node != NULL) {
/*
This can be NULL if the counter is a HM. The values does not
exist and fControl is useless
The 1 below is only correct for PSI where only the first
monitor can be the control monitor. Elsewhere this must be the
control monitor channel
*/
fControl = v.v.intArray[1];
}
}

View File

@ -151,9 +151,6 @@ static DevAction *DevNextAction(DevSer * devser)
}
static void LogStart(DevSer *self)
{
if(self->startTime > 0){
printf("DEVSER: there is something fucked up in LogStart. Investigate!\n");
}
self->startTime = DoubleTime();
}
static void LogResponse(DevSer *self, int error)

View File

@ -411,6 +411,7 @@ static int FourMessStoreIntern(pSICSOBJ self, SConnection * pCon,
SCWrite(pCon, "ERROR: store: no files open", eLogError);
return 0;
}
priv->count++;
/* get necessary data */
fSum = 0.;
@ -497,7 +498,7 @@ static int FourMessStoreIntern(pSICSOBJ self, SConnection * pCon,
fPreset, fTemp, prot, pBueffel);
} else {
fprintf(priv->profFile, "%3d %7.4f %9.0f %7.3f %12f %s %s\n", iNP, fStep,
fPreset, fTemp, prot, extra, pBueffel);
fPreset, fTemp, prot, pBueffel,extra);
}
for (i = 0; i < iNP; i++) {
for (ii = 0; ii < 10 && i < iNP; ii++) {

View File

@ -242,6 +242,9 @@ static int DriveTaskFunc(void *data)
ExeInterest(pServ->pExecutor,taskData->name, "finished with problem");
}
traceSys("drive","DriveTask %s finished with state %d", taskData->name,status);
if(taskData->pCon->transID > 100000) {
SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID);
}
return 0;
}
/*--------------------------------------------------------------------------*/
@ -272,6 +275,9 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget)
ExeInterest(pServ->pExecutor,name,"started");
DevexecLog("START",name);
InvokeNewTarget(pServ->pExecutor,name,fTarget);
if(pCon->transID > 100000) {
SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID);
}
taskData->id = DRIVEID;
taskData->obj = obj;
@ -393,6 +399,9 @@ static int CountTaskFunc(void *data)
ExeInterest(pServ->pExecutor,taskData->name, "finished with problem");
}
traceSys("count","CountTask %s finished with state %d", taskData->name,status);
if(taskData->pCon->transID > 100000) {
SCPrintf(taskData->pCon,eLog,"TASKEND %d", taskData->pCon->transID);
}
return 0;
}
/*--------------------------------------------------------------------------*/
@ -418,6 +427,9 @@ long StartCountTask(void *obj, SConnection *pCon, char *name)
}
ExeInterest(pServ->pExecutor,name,"started");
DevexecLog("START",name);
if(pCon->transID > 100000) {
SCPrintf(pCon,eLog,"TASKSTART %d", pCon->transID);
}
taskData->id = COUNTID;
taskData->obj = obj;

View File

@ -298,7 +298,7 @@ static int SecMotorStatus(void *sulf, SConnection * pCon)
int status;
pHdb node = NULL;
hdbValue v;
float interrupt;
float interrupt = 0.;
char error[132];
assert(sulf);
@ -527,7 +527,7 @@ static hdbCallbackReturn SecMotorCallback(pHdb node, void *userData,
SCSetInterrupt(pCon, eAbortBatch);
self->pDrivInt->iErrorCount = 0;
child = GetHipadabaNode(self->pDescriptor->parNode, "status");
UpdateHipadabaPar(child, MakeHdbText("run"), pCon);
UpdateHipadabaPar(child, MakeHdbText("error"), pCon);
return hdbAbort;
}

12
nread.c
View File

@ -42,6 +42,8 @@
#include "commandlog.h"
#include "uselect.h"
#include "trace.h"
#include "protocol.h"
extern pServer pServ;
extern int VerifyChannel(mkChannel * self); /* defined in network.c */
@ -296,7 +298,7 @@ static int NetReadRead(pNetRead self, pNetItem pItem)
if (strlen(pItem->pHold) > 0) {
strlcat(pItem->pHold, pPtr, 511);
/* DFC locking for protocol zero only */
if (pItem->pCon->iProtocolID == 0 &&
if (pItem->pCon->iProtocolID == PROTSICS &&
CostaLocked(pItem->pCon->pStack))
iStat = 0;
else
@ -308,7 +310,7 @@ static int NetReadRead(pNetRead self, pNetItem pItem)
} else {
/* no, normal command */
/* DFC locking for protocol zero only */
if (pItem->pCon->iProtocolID == 0 &&
if (pItem->pCon->iProtocolID == PROTSICS &&
CostaLocked(pItem->pCon->pStack))
iStat = 0;
else
@ -498,7 +500,7 @@ static int TelnetRead(pNetRead self, pNetItem pItem)
case '\r':
case '\n':
/* DFC locking for protocol zero only */
if (pItem->pCon->iProtocolID == 0 &&
if (pItem->pCon->iProtocolID == PROTSICS &&
CostaLocked(pItem->pCon->pStack))
iStat = 0;
else
@ -1076,7 +1078,7 @@ static int CommandDataCB(int handle, void *userData)
if (pPtr[i] == '\r' || pPtr[i] == '\n') {
self->state = SKIPTERM;
if (!testAndInvokeInterrupt(self, handle)) {
if (self->pCon->iProtocolID == 0 && CostaLocked(self->pCon->pStack))
if (self->pCon->iProtocolID == PROTSICS && CostaLocked(self->pCon->pStack))
status = 0;
else
status = CostaTop(self->pCon->pStack, GetCharArray(self->command));
@ -1180,7 +1182,7 @@ static int ANETTelnetProcess(int handle, void *usData)
case '\r':
case '\n':
if (!testAndInvokeInterrupt(self, handle)) {
if (self->pCon->iProtocolID == 0 && CostaLocked(self->pCon->pStack))
if (self->pCon->iProtocolID == PROTSICS && CostaLocked(self->pCon->pStack))
status = 0;
else
status = CostaTop(self->pCon->pStack, GetCharArray(self->command));

View File

@ -32,6 +32,10 @@ typedef struct __Protocol {
int isDefaultSet;
char *pProList[PROLISTLEN]; /* list of valid protocols? */
} Protocol;
/*================================================================================================
WARNING: These two char arrays may replicate things defined elsewhere. They may be out of
sync with the rest of SIS. Keep in mind.....
==================================================================================================*/
char *pEventType[] = {
"VALUECHANGE", /* 0 */
@ -188,7 +192,13 @@ static int ContextDo(SConnection * pCon, SicsInterp * pSics, void *pData,
SCWrite(pCon, "ERROR: no more memory", eError);
return 0;
}
if(comCon->transID > 100000) {
SCPrintf(comCon,eLog,"COMSTART %d", comCon->transID);
}
status = InterpExecute(pSics, comCon, command);
if(comCon->transID > 100000) {
SCPrintf(comCon,eLog,"COMEND %d", comCon->transID);
}
if (command != buffer)
free(command);
SCDeleteConnection(comCon);
@ -271,29 +281,29 @@ static int ProtocolSet(SConnection * pCon, Protocol * pPro, char *pProName)
return 0;
break;
case 1: /* normal (connection start default) */
case PROTNORM: /* normal (connection start default) */
SCSetWriteFunc(pMaster, SCNormalWrite);
SCSetWriteFunc(pCon, SCNormalWrite);
break;
case 2: /* outcodes */
case PROTCODE: /* outcodes */
SCSetWriteFunc(pMaster, SCWriteWithOutcode);
SCSetWriteFunc(pCon, SCWriteWithOutcode);
break;
case 3: /* json */
case PROTJSON: /* json */
SCSetWriteFunc(pCon, SCWriteJSON_String);
SCSetWriteFunc(pMaster, SCWriteJSON_String);
break;
case 4: /* ACT */
case PROTACT: /* ACT */
SCSetWriteFunc(pMaster, SCACTWrite);
SCSetWriteFunc(pCon, SCACTWrite);
break;
case 5:
case PROTALL:
SCSetWriteFunc(pMaster, SCAllWrite);
SCSetWriteFunc(pCon, SCAllWrite);
break;
case 0: /* default = psi_sics */
case PROTSICS: /* default = psi_sics */
default:
SCSetWriteFunc(pMaster, pPro->defaultWriter);
SCSetWriteFunc(pCon, pPro->defaultWriter);
@ -332,11 +342,11 @@ int ProtocolGet(SConnection * pCon, void *pData, char *pProName, int len)
/* check list of protocols for valid name */
switch (Index) {
case 0: /* default = psi_sics */
case 1: /* normal (connection start default) */
case 2: /* outcodes */
case 3: /* json */
case 4: /* act */
case PROTSICS: /* default = psi_sics */
case PROTNORM: /* normal (connection start default) */
case PROTCODE: /* outcodes */
case PROTJSON: /* json */
case PROTACT: /* act */
pProName = pPro->pProList[Index];
return 1;
break;
@ -441,7 +451,7 @@ static int InitDefaultProtocol(SConnection * pCon, Protocol * pPro)
if (0 == pPro->isDefaultSet) {
pPro->defaultWriter = SCGetWriteFunc(pCon);
pPro->isDefaultSet = 1;
pCon->iProtocolID = 0;
pCon->iProtocolID = PROTSICS;
}
return pPro->isDefaultSet;
}
@ -628,10 +638,11 @@ char *GetProtocolName(SConnection * pCon)
/* check list of protocols for valid name */
switch (pCon->iProtocolID) {
case 0: /* default = psi_sics */
case 1: /* normal (connection start default) */
case 2: /* outcodes */
case 3: /* json */
case PROTSICS: /* default = psi_sics */
case PROTNORM: /* normal (connection start default) */
case PROTCODE: /* outcodes */
case PROTJSON: /* json */
case PROTACT: /* act */
return strdup(pPro->pProList[pCon->iProtocolID]);
break;
default:
@ -654,13 +665,13 @@ writeFunc GetProtocolWriteFunc(SConnection * pCon)
{
if (pCon != NULL) {
switch (pCon->iProtocolID) {
case 2: /* outcodes */
case PROTCODE: /* outcodes */
return SCWriteWithOutcode;
break;
case 3: /* json */
case PROTJSON: /* json */
return SCWriteJSON_String;
break;
case 4:
case PROTACT:
return SCACTWrite;
break;
default:

View File

@ -15,6 +15,14 @@ static char *pProTags[3] = {
#define esStart -1
#define esFinish -2
/*---------------------- protocol defines -------------------------------*/
#define PROTSICS 0
#define PROTNORM 1
#define PROTCODE 2
#define PROTJSON 3
#define PROTACT 4
#define PROTALL 5
/*--------------------- lifecycle -------------------------------------- */
int InstallProtocol(SConnection * pCon, SicsInterp * pSics, void *pData,
int argc, char *argv[]);

View File

@ -2,7 +2,7 @@
* Remote objects in sicsobj. This means accessing remote objects in a different
* SICS server from a master SICS server.
*
* Reading is implementd according to this scheme:
* Reading is implemented according to this scheme:
*
* * When a read connection is made between a local node and a remote node in slave, then a
* callback is installed on remote node in slave.
@ -17,7 +17,7 @@
*
* COPRYRIGHT: see file COPYRIGHT
*
* Mark Koennecke, February 2015
* Mark Koennecke, February-May 2015
**/
#include <unistd.h>
#include <time.h>
@ -31,22 +31,29 @@
#include <lld_blob.h>
#include <dynstring.h>
#include <stptok.h>
#include <json/json.h>
#define OOM -5001 /* out of memory */
#define TO -5002 /* timeout */
#define READACT 7654
#define POCHACT 8437
static char *login = {"RemoteMaster 3ed4c656a15f0aa45e02fd5ec429225bb93b762e7eb06cc81a0b4f6c35c76184\r\n"};
extern char *trim(char *txt);
static int transactionID = 100000;
/*---------------------- our very private data structure -------------------*/
typedef struct {
char *host;
int port;
int readHandle;
int writeHandle;
int writeInUse;
int handle;
int transactHandle;
int readList;
int writeList;
unsigned int connected;
time_t nextHeartbeat;
struct json_tokener *jtok;
} RemoteOBJ, *pRemoteOBJ;
/*----------------------------------------------------------------------------*/
typedef struct {
@ -59,6 +66,12 @@ typedef struct {
char *remotePath;
} UpdateCallback, *pUpdateCallback;
/*----------------------------------------------------------------------------*/
typedef struct {
int transID;
SConnection *pCon;
int waitTask;
}writeData, *pWriteData;
/*----------------------------------------------------------------------------*/
void KillRemoteOBJ(void *data)
{
char roTaskName[132];
@ -68,66 +81,13 @@ void KillRemoteOBJ(void *data)
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
StopTask(pServ->pTasker,roTaskName);
free(self->host);
ANETclose(self->readHandle);
ANETclose(self->writeHandle);
ANETclose(self->handle);
ANETclose(self->transactHandle);
LLDdeleteBlob(self->readList);
LLDdeleteBlob(self->writeList);
json_tokener_free(self->jtok);
}
}
/*========================= reading related code ================================*/
static int RemoteReadCallback(int handle, void *userData)
{
int length;
char *pPtr, *pStart, *pEnd;
pPtr = ANETreadPtr(handle,&length);
/*
* deal with command results
*/
pStart = strstr(pPtr, "TRANSACTIONSTART");
pEnd = strstr(pPtr,"TRANSACTIONEND");
if(pStart != NULL && pEnd != NULL){
pStart = pStart + strlen("TRANSACTIONSTART");
*pEnd = '\0';
traceIO("RO","Received command - reply: %s", pStart);
pEnd += strlen("TRANSACTIONEND");
ANETreadConsume(handle,pEnd - pPtr);
}
/*
* deal with update messages
*/
pStart = strstr(pPtr, "SROC:");
pEnd = strstr(pPtr,":EROC\r\n");
if(pStart != NULL && pEnd != NULL){
pStart += strlen("SROC:");
*pEnd = '\0';
InterpExecute(pServ->pSics, pServ->dummyCon,pStart);
traceIO("RO", "Received %s from remote", pStart);
pEnd += strlen("EROC\r\n");
ANETreadConsume(handle,pEnd - pPtr);
}
/*
* deal with heartbeats
*/
if((pStart = strstr(pPtr,"Poch")) != NULL){
ANETreadConsume(handle,(pStart+4) - pPtr);
}
/*
If there is more stuff to process: recurse
*/
pPtr = ANETreadPtr(handle,&length);
if(length > 0 &&
( strstr(pPtr,":EROC\r\n") != NULL ||
strstr(pPtr,"TRANSACTIONEND") != NULL
|| strstr(pPtr,"Poch") != NULL ) ) {
RemoteReadCallback(handle,userData);
}
return 1;
}
/*-----------------------------------------------------------------------------*/
static int transactCommand(int handle, char *command, char *reply, int replyLen)
{
@ -135,7 +95,7 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
char *prefix = {"transact "};
int status, length, type;
time_t start;
char *pPtr;
char *pPtr, *pEnd;
/*
* read possible dirt of the line
@ -144,12 +104,15 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
ANETreadConsume(handle,length);
toSend = malloc(strlen(command) + strlen(prefix) + 1);
toSend = malloc(strlen(command) + strlen(prefix) + 10);
if(toSend == NULL){
return OOM;
}
strcpy(toSend, prefix);
strcat(toSend, command);
if(strstr(command,"\n") == NULL){
strcat(toSend,"\r\n");
}
status = ANETwrite(handle,toSend,strlen(toSend));
free(toSend);
if(status != 1){
@ -163,7 +126,8 @@ static int transactCommand(int handle, char *command, char *reply, int replyLen)
while(time(NULL) < start + 2.0){
ANETprocess();
pPtr = ANETreadPtr(handle,&length);
if(length > 0 && strstr(pPtr,"TRANSACTIONFINISHED") != NULL){
if(length > 0 && (pEnd = strstr(pPtr,"TRANSACTIONFINISHED")) != NULL){
*pEnd = '\0';
strncpy(reply,pPtr,replyLen);
ANETreadConsume(handle,length);
return 1;
@ -191,9 +155,9 @@ static void ConnectRemoteObject(pRemoteOBJ self)
return;
}
self->readHandle = ANETconnect(self->host, self->port);
self->writeHandle = ANETconnect(self->host, self->port);
if(self->readHandle < 0 || self->writeHandle < 0){
self->handle = ANETconnect(self->host, self->port);
self->transactHandle = ANETconnect(self->host, self->port);
if(self->handle < 0 || self->transactHandle < 0){
self->connected = 0;
traceIO("RO","Failed to connect to remote objects at %s, port %d",
self->host, self->port);
@ -206,23 +170,20 @@ static void ConnectRemoteObject(pRemoteOBJ self)
Default login with hard coded manager login. Defined in
nserver.c
*/
ANETwrite(self->readHandle,login,strlen(login));
ANETwrite(self->writeHandle,login,strlen(login));
ANETwrite(self->handle,login,strlen(login));
ANETwrite(self->transactHandle,login,strlen(login));
usleep(500);
ANETprocess();
/*
eat the login responses
*/
pPtr = ANETreadPtr(self->readHandle, &length);
ANETreadConsume(self->readHandle,length);
pPtr = ANETreadPtr(self->writeHandle, &length);
ANETreadConsume(self->writeHandle,length);
pPtr = ANETreadPtr(self->handle, &length);
ANETreadConsume(self->handle,length);
pPtr = ANETreadPtr(self->transactHandle, &length);
ANETreadConsume(self->transactHandle,length);
transactCommand(self->handle,"protocol set json\r\n", command,sizeof(command)-1);
/*
* install the read callback
*/
ANETsetReadCallback(self->readHandle,RemoteReadCallback, NULL, NULL);
/*
* Remove geterror on read nodes and reinstall callbacks for reconnects
@ -233,17 +194,15 @@ static void ConnectRemoteObject(pRemoteOBJ self)
node = FindHdbNode(NULL,rd.localNode,NULL);
if(node != NULL){
SetHdbProperty(node,"geterror",NULL);
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n",
rd.remoteNode, rd.localNode);
ANETwrite(self->readHandle,command,strlen(command));
snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->handle,command,strlen(command));
}
status = LLDnodePtr2Next(self->readList);
}
transactCommand(self->writeHandle,"protocol set withcode\r\n", command,sizeof(command));
self->connected = 1;
self->writeInUse = 0;
}
/*-----------------------------------------------------------------------------*/
static void MarkDisconnected(pRemoteOBJ self)
@ -270,8 +229,8 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
pUpdateCallback uppi = (pUpdateCallback)userData;
hdbDataMessage *mm = NULL;
pDynString text;
char *prefix = {"SROC:hupdate "};
char *postfix= {":EROC\r\n"};
char *prefix = {"hupdate "};
char *postfix= {" \r\n"};
char *txt = NULL;
int length;
pHdbPropertyChange propChange = NULL;
@ -309,7 +268,7 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
if(!SCisConnected(uppi->sendCon)){
return hdbKill;
}
length = strlen("SROC:hdelprop ") + strlen(uppi->remotePath) +
length = strlen("hdelprop ") + strlen(uppi->remotePath) +
strlen(propChange->key) + 10;
if(propChange->value != NULL){
length += strlen(propChange->value);
@ -319,10 +278,10 @@ static hdbCallbackReturn ROUpdateCallback(pHdb currentNode, void *userData,
return hdbContinue;
}
if(propChange->value == NULL){
snprintf(txt,length,"SROC:hdelprop %s %s %s", uppi->remotePath,
snprintf(txt,length,"hdelprop %s %s %s", uppi->remotePath,
propChange->key,postfix);
} else {
snprintf(txt,length,"SROC:hsetprop %s %s %s %s", uppi->remotePath,
snprintf(txt,length,"hsetprop %s %s %s %s", uppi->remotePath,
propChange->key,propChange->value, postfix);
}
SCWrite(uppi->sendCon,txt,eValue);
@ -397,7 +356,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
* Get information about the remote node and check compatability
*/
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply));
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
/*
* try a reconnect,
@ -407,7 +366,7 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
*/
self->connected = 0;
ConnectRemoteObject(self);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply));
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host);
@ -448,9 +407,9 @@ static int ConnectRead(pRemoteOBJ self, SConnection * pCon, ReadData rd)
* Install a callback on the remote node to update the master. The remote should
* then immediatly send an update which will be processed by the read callback.
*/
snprintf(command,sizeof(command),"fulltransact addremotecb %s %s \r\n",
rd.remoteNode, rd.localNode);
ANETwrite(self->readHandle,command,strlen(command));
snprintf(command,sizeof(command),"contextdo %d addremotecb %s %s \r\n",
READACT, rd.remoteNode, rd.localNode);
ANETwrite(self->handle,command,strlen(command));
return 1;
}
@ -492,10 +451,10 @@ static int HeartbeatTask(void *pData)
{
pRemoteOBJ self = (pRemoteOBJ)pData;
int status;
char command[] = {"Poch\r\n"};
char command[] = {"contextdo 8437 Poch\r\n"};
if (time(NULL) > self->nextHeartbeat){
status = ANETwrite(self->readHandle,command, strlen(command));
status = ANETwrite(self->handle,command, strlen(command));
if(status != 1){
traceIO("RO","Trying a reconnect to %s, %d", self->host, self->port);
self->connected = 0;
@ -509,11 +468,13 @@ static int HeartbeatTask(void *pData)
return 1;
}
/*============================= writing related code ===========================
The logic here is to use the standard writeHandle when available. I expect most
communication to be short and to happen through the writeHandle. If that one is
in use, a new connection will be built.
---------------------------------------------------------------------------------
suppress all superfluous OK from the slave
This works by sending the command via contextdo with a ID > 10^6. This causes
the remote SICS to send the termination messages. The transaction IDs together
with the connection responsible for it are kept in a list.
This list is used by the write task to forward messages properly and for handling
termination.
-----------------------------------------------------------------------------------*/
#include <outcode.c>
static OutCode findOutCode(char *txt)
@ -527,93 +488,153 @@ static OutCode findOutCode(char *txt)
}
return eValue;
}
/*--------------------------------------------------------------------------------*/
static void printSICS(char *answer, SConnection *pCon)
/*-----------------------------------------------------------------------------------*/
static void CheckWriteList(int writeList,int transID, OutCode eOut, char *pText)
{
char line[1024], *pPtr, *pCode;
OutCode eCode;
int status;
writeData WD;
pPtr = answer;
while(pPtr != NULL){
memset(line,0,sizeof(line));
pPtr = stptok(pPtr,line,sizeof(line),"\n");
if(strstr(line,"OK") == NULL){
pCode = strstr(line,"@@");
if(pCode != NULL){
*pCode = '\0';
pCode += 2;
eCode = findOutCode(trim(pCode));
status = LLDnodePtr2First(writeList);
while(status == 1){
LLDblobData(writeList,&WD);
if(WD.transID == transID){
if(strstr(pText,"COMSTART") != NULL){
/* skip */
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 0) {
SCDeleteConnection(WD.pCon);
LLDblobDelete(writeList);
return;
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) {
/* skip */
return;
} else if(strstr(pText,"TASKSTART") != NULL){
WD.waitTask = 1 ;
LLDblobDelete(writeList);
LLDblobAppend(writeList,&WD, sizeof(writeData));
return;
} else if(strstr(pText,"TASKEND") != NULL && WD.waitTask == 1){
SCDeleteConnection(WD.pCon);
LLDblobDelete(writeList);
return;
} else {
eCode = eValue;
if(strstr(pText,"OK") == NULL){
SCWrite(WD.pCon,pText,eOut);
}
return;
}
SCWrite(pCon,line,eCode);
}
status = LLDnodePtr2Next(writeList);
}
}
/*---------------------------------------------------------------------------------*/
static int PrepareWriteHandle(pRemoteOBJ self, SConnection *pCon, int *newHandle)
}
/*-----------------------------------------------------------------------------------*/
static int WriteResponseTask(void *pData)
{
int handle, length;
char *answer = NULL;
char command[80];
pRemoteOBJ self = (pRemoteOBJ)pData;
int status, length = 0, transID;
char *pText, *outTxt;
json_object *message = NULL, *data = NULL;
enum json_tokener_error tokerr;
OutCode eOut;
writeData WD;
if(self->writeInUse) {
handle = ANETconnect(self->host,self->port);
if(handle < 0){
traceIO("RO","Failed to connect to %s at %d", self->host, self->port);
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Failed to connect to %s %d", self->host, self->port);
}
return handle;
}
ANETwrite(handle,login,strlen(login));
usleep(500);
ANETprocess();
/*
eat the login responses
*/
answer = ANETreadPtr(handle, &length);
ANETreadConsume(handle,length);
*newHandle = 1;
transactCommand(handle,"protocol set withcode\r\n", command,sizeof(command));
} else {
self->writeInUse = 1;
handle = self->writeHandle;
/*
eat dirt from the line
*/
answer = ANETreadPtr(handle, &length);
ANETreadConsume(handle,length);
if(!ANETvalidHandle(self->handle)) {
return 1;
}
return handle;
pText = ANETreadPtr(self->handle,&length);
while(length > 0){
json_tokener_reset(self->jtok);
message = json_tokener_parse_ex(self->jtok,pText,length);
tokerr = self->jtok->err;
if(tokerr == json_tokener_continue){
return 1;
} else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s %d",
json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset);
ANETreadConsume(self->handle,length);
return 1;
}
if(json_object_get_type(message) != json_type_object) {
traceIO("RO","Received JSON of bad type in %s from %s",pText,self->host);
ANETreadConsume(self->handle,length);
return 1;
}
/*
we need to consume here what has been parsed.
The char_offset in the tokenizer structure might tell us that...
*/
ANETreadConsume(self->handle,self->jtok->char_offset);
/*
Received a valid message, process
*/
data = json_object_object_get(message,"trans");
if(data == NULL){
traceIO("RO","No transaction ID found in %s from %s", pText,self->host);
return 1;
}
transID = json_object_get_int(data);
data = json_object_object_get(message,"flag");
if(data == NULL){
traceIO("RO","No flag found in %s from %s", pText,self->host);
return 1;
}
outTxt = (char *)json_object_get_string(data);
eOut = findOutCode(outTxt);
data = json_object_object_get(message,"data");
if(data == NULL){
traceIO("RO","No data found in %s from %s", pText,self->host);
return 1;
}
pText = (char *)json_object_get_string(data);
traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText);
/*
do nothing on Poch
*/
if(transID == POCHACT){
pText = ANETreadPtr(self->handle,&length);
json_object_put(message);
continue;
}
/*
process update messages
*/
if(transID == READACT){
if(strstr(pText,"hupdate") != NULL || strstr(pText,"prop") != NULL){
InterpExecute(pServ->pSics,pServ->dummyCon,pText);
}
traceIO("RO","Received %s from remote",pText);
pText = ANETreadPtr(self->handle,&length);
json_object_put(message);
continue;
}
/*
check write List
*/
CheckWriteList(self->writeList,transID,eOut,pText);
json_object_put(message);
pText = ANETreadPtr(self->handle,&length);
}
return 1;
}
/*---------------------------------------------------------------------------------*/
static void ProcessWriteResponse(pRemoteOBJ self, int handle, SConnection *pCon)
static int IncrementTransactionID()
{
char *answer = NULL, *pEnd, *command = NULL;
int length;
while(1){
TaskYield(pServ->pTasker);
if(!ANETvalidHandle(handle)){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s", self->host);
break;
}
answer = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
if(pCon != NULL){
*pEnd = '\0';
printSICS(answer,pCon);
}
traceIO("RO","%s:%d: Received %s", self->host, self->port,answer);
ANETreadConsume(handle,length);
break;
}
transactionID++;
if(transactionID >= 200000){
transactionID = 100000;
}
return transactionID;
}
/*---------------------------------------------------------------------------------*/
static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
@ -626,14 +647,10 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
pDynString data;
char *remoteNode;
char *command, *answer, *pEnd;
writeData WD;
if((mm = GetHdbSetMessage(mes)) != NULL){
pCon = (SConnection *)mm->callData;
handle = PrepareWriteHandle(self,pCon,&newHandle);
if(handle < 0){
return hdbAbort;
}
/*
build the command to send
@ -648,53 +665,32 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
}
return hdbAbort;
}
snprintf(command,length,"transact hset %s %s\r\n",remoteNode, GetCharArray(data));
WD.pCon = SCCopyConnection(pCon);
WD.waitTask = 0;
WD.transID = IncrementTransactionID();
snprintf(command,length,"contextdo %d hset %s %s\r\n",
WD.transID, remoteNode, GetCharArray(data));
/*
write
*/
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
status = ANETwrite(handle,command,strlen(command));
LLDblobAppend(self->writeList,&WD,sizeof(writeData));
status = ANETwrite(self->handle,command,strlen(command));
free(command);
DeleteDynString(data);
if(status < 0){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected", remoteNode, self->host);
}
return hdbAbort;
}
/*
wait for a response: TRANSACTIONFINISHED
*/
ProcessWriteResponse(self,handle,pCon);
/*
Is there a termination script?
*/
command = GetHdbProp(currentNode,"termscript");
if(command != NULL){
while(1) {
TaskYield(pServ->pTasker);
Tcl_Eval(InterpGetTcl(pServ->pSics),command);
answer = (char *)Tcl_GetStringResult(InterpGetTcl(pServ->pSics));
if(strstr(answer,"idle") != NULL){
answer = ANETreadPtr(handle,&length);
printSICS(answer,pCon);
traceIO("RO","%s:%d:Received %s", self->host,self->port,answer);
ANETreadConsume(handle,length);
break;
}
self->connected = 0;
ConnectRemoteObject(self);
if(self->connected == 0){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: remote %s on %s disconnected",
remoteNode, self->host);
}
return hdbAbort;
}
}
if(newHandle){
ANETclose(handle);
} else {
self->writeInUse = 0;
}
return hdbContinue;
}
return hdbContinue;
@ -722,12 +718,17 @@ static int ConnectWrite(pRemoteOBJ self, SConnection *pCon, ReadData rd)
SetHdbProperty(localNode,"remotewrite",rd.remoteNode);
AppendHipadabaCallback(localNode, MakeHipadabaCallback(ROWriteCallback,
self,NULL));
/*
TODO: The connected write nodes should be held in a list in order to be able to
remove the write callbacks when deleting the remote object. As removing remote
objects usually only happens when SICS shuts down this is not so important.
*/
/*
* Get information about the remote node and check compatability
*/
snprintf(command,sizeof(command),"hinfo %s\r\n", rd.remoteNode);
status = transactCommand(self->writeHandle,command,reply,sizeof(reply));
status = transactCommand(self->transactHandle,command,reply,sizeof(reply));
if(status != 1){
SCPrintf(pCon,eWarning,"WARNING: cannot yet reach slave %s, but continuing...",
self->host);
@ -784,61 +785,21 @@ static int ConnectwriteCmd(pSICSOBJ ccmd, SConnection * pCon,
/*============================ remote execute =================================*/
static int RemoteExecute(pRemoteOBJ self, SConnection *pCon, char *command)
{
int status, handle, newHandle = 0, length;
char *answer, *pEnd;
handle = PrepareWriteHandle(self,pCon,&newHandle);
if(handle < 0){
return 0;
}
int status;
char answer[65536];
/*
write, thereby taking care to prefix with transact and for proper termination
*/
if(strstr(command,"transact") == NULL){
ANETwrite(handle,"transact ", sizeof("transact "));
}
status = ANETwrite(handle,command,strlen(command));
if(strstr(command,"\n") == NULL){
ANETwrite(handle,"\r\n",2);
}
if(status < 0){
traceIO("RO","Disconnect from %s while executing %s", self->host, command);
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
return 0;
}
/*
wait for response
*/
while(1){
TaskYield(pServ->pTasker);
if(!ANETvalidHandle(handle)){
if(pCon != NULL){
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
break;
}
answer = ANETreadPtr(handle,&length);
if(length > 0 && (pEnd = strstr(answer,"TRANSACTIONFINISHED")) != NULL){
if(pCon != NULL){
*pEnd = '\0';
SCPrintf(pCon,eValue,answer);
}
ANETreadConsume(handle,length);
break;
}
}
if(newHandle){
ANETclose(handle);
memset(answer,0,sizeof(answer)-1);
status = transactCommand(self->transactHandle,command,answer,sizeof(answer));
if(status == 1){
SCWrite(pCon,answer,eValue);
} else {
self->writeInUse = 0;
SCPrintf(pCon,eError,"ERROR: Disconnected from %s %d", self->host, self->port);
}
return 1;
return status;
}
/*------------------------------------------------------------------------------*/
static int RemoteExecuteCmd(pSICSOBJ ccmd, SConnection * pCon,
@ -920,6 +881,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
self->host = strdup(argv[2]);
self->port = atoi(argv[3]);
self->readList = LLDblobCreate();
self->writeList = LLDblobCreate();
self->jtok = json_tokener_new();
ConnectRemoteObject(self);
cmd = AddSICSHdbPar(pNew->objectNode,
@ -949,6 +912,8 @@ static int MakeRemoteObject(SConnection *pCon, SicsInterp *pSics, void *pData,
snprintf(roTaskName,sizeof(roTaskName),"ro-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, HeartbeatTask, NULL,NULL,self,1);
snprintf(roTaskName,sizeof(roTaskName),"rocom-%s-%d", self->host, self->port);
TaskRegisterN(pServ->pTasker, roTaskName, WriteResponseTask, NULL,NULL,self,1);
return status;
}

View File

@ -553,7 +553,7 @@ static char *SctActionHandler(void *actionData, char *lastReply,
} else {
l = strlen(origScript);
}
snprintf(eprop, sizeof eprop, "error_in_%.*s", l, origScript);
snprintf(eprop, sizeof eprop, "error_in_%s", origScript);
emsg = GetHdbProp(node, eprop);
cnt = 0;
if (emsg != NULL) {

View File

@ -477,7 +477,6 @@ int InvokeSICSOBJ(SConnection * pCon, SicsInterp * pSics, void *pData,
status = GetHdbProperty(parNode,"geterror",buffer,sizeof(buffer));
if (status == 1 && strstr(buffer,"none") == NULL){
SCPrintf(pCon,eValue,"ERROR: %s on last read of %s", buffer, argv[0]);
SCPrintf(pCon,eValue,"%s = -99999", argv[0]);
return 0;
}
status = GetHipadabaPar(parNode, &data, pCon);

View File

@ -1,14 +1,14 @@
/*
** stptok() -- public domain by Ray Gardner, modified by Bob Stout
**
** You pass this function a string to parse, a buffer to receive the
** "token" that gets scanned, the length of the buffer, and a string of
** "break" characters that stop the scan. It will copy the string into
** the buffer up to any of the break characters, or until the buffer is
** full, and will always leave the buffer null-terminated. It will
** return a pointer to the first non-breaking character after the one
** that stopped the scan.
*/
/*
** stptok() -- public domain by Ray Gardner, modified by Bob Stout
**
** You pass this function a string to parse, a buffer to receive the
** "token" that gets scanned, the length of the buffer, and a string of
** "break" characters that stop the scan. It will copy the string into
** the buffer up to any of the break characters, or until the buffer is
** full, and will always leave the buffer null-terminated. It will
** return a pointer to the first non-breaking character after the one
** that stopped the scan.
*/
#ifndef STPSTPTOK
#define STPSTPTOK
char *stptok(const char *s, char *tok, size_t toklen, char *brk);