Now, I fiddling with the message writitng code, I get all remote object

messages but in the wrong order. They are sent in the right order, though.
Have to chnage the code to use one connection asynchronously for everything only.
This commit is contained in:
2015-05-04 08:02:37 +02:00
parent 5d0e05d77b
commit aef2a36b60
2 changed files with 38 additions and 10 deletions

View File

@ -74,6 +74,15 @@
#include "sicshipadaba.h" #include "sicshipadaba.h"
#include "protocol.h" #include "protocol.h"
#include "sicsvar.h" #include "sicsvar.h"
#include <json/json.h>
/*
Greetings from protocol.c for SCLogWrite...
*/
extern struct json_object *mkJSON_Object(SConnection * pCon, char *pBuffer,
int iOut);
/* /*
#define UUDEB 1 #define UUDEB 1
define UUDEB , for buffer writing for checking encoding */ define UUDEB , for buffer writing for checking encoding */
@ -1082,13 +1091,14 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
{ {
char pBueffel[1024]; char pBueffel[1024];
char *pPtr; char *pPtr;
json_object *myJson = NULL;
/* for commandlog tail */ /* for commandlog tail */
if (!VerifyConnection(self)) { if (!VerifyConnection(self)) {
return 0; return 0;
} }
if(self->iProtocolID == 5) { if(self->iProtocolID == 4) { /* act */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1100,6 +1110,12 @@ int SCPureSockWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == 3) {
myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));
json_object_put(myJson);
}
} else { } else {
testAndWriteSocket(self, buffer, iOut); testAndWriteSocket(self, buffer, iOut);
} }
@ -1113,6 +1129,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
{ {
char pBueffel[1024]; char pBueffel[1024];
char *pPtr; char *pPtr;
json_object *myJson = NULL;
if (!VerifyConnection(self)) { if (!VerifyConnection(self)) {
return 0; return 0;
@ -1121,7 +1138,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
WriteToCommandLogId(NULL, self->sockHandle, buffer); WriteToCommandLogId(NULL, self->sockHandle, buffer);
SetSendingConnection(NULL); SetSendingConnection(NULL);
if(self->iProtocolID == 5) { if(self->iProtocolID == 4) { /* act */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1133,7 +1150,7 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == 2) { } else if(self->iProtocolID == 2) { /* withcode */
if (strlen(buffer) + 30 > 1024) { if (strlen(buffer) + 30 > 1024) {
pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char)); pPtr = (char *) malloc((strlen(buffer) + 30) * sizeof(char));
memset(pPtr, 0, strlen(buffer) + 20); memset(pPtr, 0, strlen(buffer) + 20);
@ -1145,6 +1162,12 @@ int SCLogWrite(SConnection * self, char *buffer, int iOut)
if(pPtr != pBueffel){ if(pPtr != pBueffel){
free(pPtr); free(pPtr);
} }
} else if(self->iProtocolID == 3) { /* json */
myJson = mkJSON_Object(self,buffer,iOut);
if(myJson != NULL){
SCDoSockWrite(self,(char *)json_object_to_json_string(myJson));
json_object_put(myJson);
}
} else { } else {
testAndWriteSocket(self, buffer, iOut); testAndWriteSocket(self, buffer, iOut);
} }

View File

@ -17,7 +17,7 @@
* *
* COPRYRIGHT: see file COPYRIGHT * COPRYRIGHT: see file COPYRIGHT
* *
* Mark Koennecke, February 2015 * Mark Koennecke, February-May 2015
**/ **/
#include <unistd.h> #include <unistd.h>
#include <time.h> #include <time.h>
@ -209,8 +209,8 @@ static void ConnectRemoteObject(pRemoteOBJ self)
return; return;
} }
self->readHandle = ANETconnect(self->host, self->port);
self->writeHandle = ANETconnect(self->host, self->port); self->writeHandle = ANETconnect(self->host, self->port);
self->readHandle = ANETconnect(self->host, self->port);
self->transactHandle = ANETconnect(self->host, self->port); self->transactHandle = ANETconnect(self->host, self->port);
if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){ if(self->readHandle < 0 || self->writeHandle < 0 || self->transactHandle < 0){
self->connected = 0; self->connected = 0;
@ -566,15 +566,15 @@ static int WriteResponseTask(void *pData)
} }
pText = ANETreadPtr(self->writeHandle,&length); pText = ANETreadPtr(self->writeHandle,&length);
if(length > 0){ while(length > 0){
json_tokener_reset(self->jtok); json_tokener_reset(self->jtok);
message = json_tokener_parse_ex(self->jtok,pText,length); message = json_tokener_parse_ex(self->jtok,pText,length);
tokerr = self->jtok->err; tokerr = self->jtok->err;
if(tokerr == json_tokener_continue){ if(tokerr == json_tokener_continue){
return 1; return 1;
} else if(tokerr != json_tokener_success) { } else if(tokerr != json_tokener_success) {
traceIO("RO","JSON parsing error %s on %s from %s", traceIO("RO","JSON parsing error %s on %s from %s %d",
json_tokener_errors[tokerr], pText, self->host); json_tokener_errors[tokerr], pText, self->host, self->jtok->char_offset);
ANETreadConsume(self->writeHandle,length); ANETreadConsume(self->writeHandle,length);
return 1; return 1;
} }
@ -614,6 +614,8 @@ static int WriteResponseTask(void *pData)
return 1; return 1;
} }
pText = (char *)json_object_get_string(data); pText = (char *)json_object_get_string(data);
traceIO("RO","Received:%s:%d:%d:%s",self->host,transID,eOut,pText);
status = LLDnodePtr2First(self->writeList); status = LLDnodePtr2First(self->writeList);
while(status == 1){ while(status == 1){
@ -625,6 +627,9 @@ static int WriteResponseTask(void *pData)
SCDeleteConnection(WD.pCon); SCDeleteConnection(WD.pCon);
LLDblobDelete(self->writeList); LLDblobDelete(self->writeList);
return 1; return 1;
} else if(strstr(pText,"COMEND") != NULL && WD.waitTask == 1) {
/* skip */
return 1;
} else if(strstr(pText,"TASKSTART") != NULL){ } else if(strstr(pText,"TASKSTART") != NULL){
WD.waitTask = 1 ; WD.waitTask = 1 ;
LLDblobDelete(self->writeList); LLDblobDelete(self->writeList);
@ -641,7 +646,7 @@ static int WriteResponseTask(void *pData)
} }
status = LLDnodePtr2Next(self->writeList); status = LLDnodePtr2Next(self->writeList);
} }
pText = ANETreadPtr(self->writeHandle,&length);
} }
return 1; return 1;
@ -695,6 +700,7 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
write write
*/ */
traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command); traceIO("RO","%s:%d: Sending %s ", self->host, self->port, command);
LLDblobAppend(self->writeList,&WD,sizeof(writeData));
status = ANETwrite(self->writeHandle,command,strlen(command)); status = ANETwrite(self->writeHandle,command,strlen(command));
free(command); free(command);
DeleteDynString(data); DeleteDynString(data);
@ -704,7 +710,6 @@ static hdbCallbackReturn ROWriteCallback(pHdb currentNode, void *userData,
} }
return hdbAbort; return hdbAbort;
} }
LLDblobAppend(self->writeList,&WD,sizeof(writeData));
return hdbContinue; return hdbContinue;
} }