get rid of writeRaw for asyn 4.10

This commit is contained in:
zimoch
2008-09-16 07:24:24 +00:00
parent b307adb005
commit e83ceda41c

View File

@ -59,12 +59,12 @@ writeRequest()
pasynManager->queueRequest() pasynManager->queueRequest()
when request is handled when request is handled
pasynOctet->flush() pasynOctet->flush()
pasynOctet->writeRaw() pasynOctet->write()
if writeRaw() times out if write() times out
writeCallback(StreamIoTimeout) writeCallback(StreamIoTimeout)
if writeRaw fails otherwise if write fails otherwise
writeCallback(StreamIoFault) writeCallback(StreamIoFault)
if writeRaw succeeds and all bytes have been written if write succeeds and all bytes have been written
writeCallback(StreamIoSuccess) writeCallback(StreamIoSuccess)
if not all bytes can be written if not all bytes can be written
pasynManager->queueRequest() to write next part pasynManager->queueRequest() to write next part
@ -605,15 +605,27 @@ writeHandler()
size_t streameoslen; size_t streameoslen;
const char* streameos = getOutTerminator(streameoslen); const char* streameos = getOutTerminator(streameoslen);
if (streameos) // stream has added eos int oldeoslen = -1;
char oldeos[16];
if (streameos) // stream has already added eos, don't do it again in asyn
{ {
status = pasynOctet->writeRaw(pvtOctet, pasynUser, // clear terminator for asyn
outputBuffer, outputSize, &written); status = pasynOctet->getOutputEos(pvtOctet,
pasynUser, oldeos, sizeof(oldeos)-1, &oldeoslen);
if (status != asynSuccess)
{
oldeoslen = -1;
// No EOS support?
}
pasynOctet->setOutputEos(pvtOctet, pasynUser,
NULL, 0);
} }
else // asyn should add eos status = pasynOctet->write(pvtOctet, pasynUser,
outputBuffer, outputSize, &written);
if (oldeoslen >= 0) // restore asyn terminator
{ {
status = pasynOctet->write(pvtOctet, pasynUser, pasynOctet->setOutputEos(pvtOctet, pasynUser,
outputBuffer, outputSize, &written); oldeos, oldeoslen);
} }
switch (status) switch (status)
{ {
@ -773,7 +785,7 @@ readHandler()
pasynUser->timeout = replyTimeout; pasynUser->timeout = replyTimeout;
} }
bool waitForReply = true; bool waitForReply = true;
int received; size_t received;
int eomReason; int eomReason;
asynStatus status; asynStatus status;
long readMore; long readMore;
@ -785,7 +797,7 @@ readHandler()
eomReason = 0; eomReason = 0;
status = pasynOctet->read(pvtOctet, pasynUser, status = pasynOctet->read(pvtOctet, pasynUser,
buffer, bytesToRead, (size_t*)&received, &eomReason); buffer, bytesToRead, &received, &eomReason);
if (ioAction == Read || status != asynTimeout) if (ioAction == Read || status != asynTimeout)
{ {
debug("AsynDriverInterface::readHandler(%s): " debug("AsynDriverInterface::readHandler(%s): "
@ -804,8 +816,8 @@ readHandler()
debug("AsynDriverInterface::readHandler(%s): " debug("AsynDriverInterface::readHandler(%s): "
"AsyncRead poll: received %d of %d bytes \"%s\" " "AsyncRead poll: received %d of %d bytes \"%s\" "
"eomReason=%s [data ignored]\n", "eomReason=%s [data ignored]\n",
clientName(), received, bytesToRead, clientName(), (int)received, bytesToRead,
StreamBuffer(buffer, received).expand()(), StreamBuffer(buffer, (int)received).expand()(),
eomReasonStr[eomReason&0x7]); eomReasonStr[eomReason&0x7]);
#endif #endif
// ignore what we got from here. // ignore what we got from here.
@ -818,8 +830,8 @@ readHandler()
debug("AsynDriverInterface::readHandler(%s): " debug("AsynDriverInterface::readHandler(%s): "
"received %d of %d bytes \"%s\" " "received %d of %d bytes \"%s\" "
"eomReason=%s\n", "eomReason=%s\n",
clientName(), received, bytesToRead, clientName(), (int)received, bytesToRead,
StreamBuffer(buffer, received).expand()(), StreamBuffer(buffer, (int)received).expand()(),
eomReasonStr[eomReason&0x7]); eomReasonStr[eomReason&0x7]);
#endif #endif
// asynOctet->read() cuts off terminator, but: // asynOctet->read() cuts off terminator, but:
@ -839,7 +851,7 @@ readHandler()
size_t i; size_t i;
for (i = 0; i < deveoslen; i++, received++) for (i = 0; i < deveoslen; i++, received++)
{ {
if (received >= 0) buffer[received] = deveos[i]; if ((int)received >= 0) buffer[received] = deveos[i];
// It is safe to add to buffer here, because // It is safe to add to buffer here, because
// the terminator was already there before // the terminator was already there before
// asynOctet->read() had cut it. // asynOctet->read() had cut it.
@ -877,7 +889,7 @@ readHandler()
debug("AsynDriverInterface::readHandler(%s): " debug("AsynDriverInterface::readHandler(%s): "
"ioAction=%s, timeout after %d of %d bytes \"%s\"\n", "ioAction=%s, timeout after %d of %d bytes \"%s\"\n",
clientName(), ioActionStr[ioAction], clientName(), ioActionStr[ioAction],
received, bytesToRead, (int)received, bytesToRead,
StreamBuffer(buffer, received).expand()()); StreamBuffer(buffer, received).expand()());
#endif #endif
if (ioAction == AsyncRead || ioAction == AsyncReadMore) if (ioAction == AsyncRead || ioAction == AsyncReadMore)
@ -944,28 +956,11 @@ void intrCallbackOctet(void* /*pvt*/, asynUser *pasynUser,
// Problems here: // Problems here:
// 1. We get this message too when we are the poller. // 1. We get this message too when we are the poller.
// Thus we have to ignore what we got from polling. // Thus we have to ignore what we got from polling.
// 2. We get this message multiple times when original reader // 2. eomReason=ASYN_EOM_CNT when message was too long for
// reads in chunks.
// 3. eomReason=ASYN_EOM_CNT when message was too long for
// internal buffer of asynDriver. // internal buffer of asynDriver.
if (!interruptAccept) return; // too early to process records if (!interruptAccept) return; // too early to process records
debug("AsynDriverInterface::intrCallbackOctet(%s) ioAction = %s\n", interface->asynReadHandler(data, numchars, eomReason);
interface->clientName(), ioActionStr[interface->ioAction]);
if (interface->ioAction == AsyncRead ||
interface->ioAction == AsyncReadMore)
{
interface->asynReadHandler(data, numchars, eomReason);
}
else
{
#ifndef NO_TEMPORARY
debug("AsynDriverInterface::intrCallbackOctet(%s, buffer=\"%s\", "
"received=%d eomReason=%s) ioAction=%s\n",
interface->clientName(), StreamBuffer(data, numchars).expand()(),
numchars, eomReasonStr[eomReason&0x7], ioActionStr[interface->ioAction]);
#endif
}
} }
// get asynchronous input // get asynchronous input