send version message to client if it is on a new protocol version

This commit is contained in:
Jeff Hill
2002-06-25 18:31:13 +00:00
parent c78c3ce853
commit 96aab4a62e
5 changed files with 157 additions and 42 deletions
+40 -7
View File
@@ -507,7 +507,7 @@ LOCAL void read_reply ( void *pArg, struct dbAddr *paddr,
"server unable to load read (or subscription update) response into protocol buffer PV=\"%s\" max bytes=%u",
RECORD_NAME ( paddr ), rsrvSizeofLargeBufTCP );
if ( ! eventsRemaining )
cas_send_msg ( pClient, ! pevext->send_lock );
cas_send_bs_msg ( pClient, ! pevext->send_lock );
if ( pevext->send_lock )
SEND_UNLOCK ( pClient );
return;
@@ -519,7 +519,7 @@ LOCAL void read_reply ( void *pArg, struct dbAddr *paddr,
if ( ! asCheckGet ( pciu->asClientPVT ) ) {
no_read_access_event ( pClient, pevext );
if ( ! eventsRemaining )
cas_send_msg ( pClient, !pevext->send_lock );
cas_send_bs_msg ( pClient, !pevext->send_lock );
if ( pevext->send_lock ) {
SEND_UNLOCK ( pClient );
}
@@ -595,7 +595,7 @@ LOCAL void read_reply ( void *pArg, struct dbAddr *paddr,
* them up like db requests when the OPI does not keep up.
*/
if ( ! eventsRemaining )
cas_send_msg ( pClient, ! pevext->send_lock );
cas_send_bs_msg ( pClient, ! pevext->send_lock );
if ( pevext->send_lock )
SEND_UNLOCK ( pClient );
@@ -1459,7 +1459,7 @@ void write_notify_reply(void *pArg)
ppnb->busy = FALSE;
}
cas_send_msg ( pClient, FALSE );
cas_send_bs_msg ( pClient, FALSE );
SEND_UNLOCK ( pClient );
@@ -1923,10 +1923,43 @@ LOCAL void search_fail_reply ( caHdrLargeArray *mp, void *pPayload, struct clien
}
/*
* udp_noop_action()
* udp_version_action()
*/
LOCAL int udp_noop_action ( caHdrLargeArray *mp, void *pPayload, struct client *client )
LOCAL int udp_version_action ( caHdrLargeArray *mp, void *pPayload, struct client *client )
{
if ( mp->m_count != 0 ) {
client->minor_version_number = mp->m_count;
if ( CA_V411 ( mp->m_count ) ) {
client->seqNoOfReq = mp->m_cid;
}
else {
client->seqNoOfReq = 0;
}
}
return RSRV_OK;
}
/*
* rsrv_version_reply()
*/
int rsrv_version_reply ( struct client *client )
{
int success;
SEND_LOCK ( client );
/*
* sequence number is specified zero when we copy in the
* header because we dont know it until we receive a datagram
* from the client
*/
success = cas_copy_in_header ( client, CA_PROTO_VERSION,
0, 0, CA_MINOR_PROTOCOL_REVISION,
0, 0, 0 );
if ( ! success ) {
SEND_UNLOCK ( client );
return RSRV_ERROR;
}
cas_commit_msg ( client, 0 );
SEND_UNLOCK ( client );
return RSRV_OK;
}
@@ -2084,7 +2117,7 @@ LOCAL const pProtoStubTCP tcpJumpTable[] =
typedef int (*pProtoStubUDP) (caHdrLargeArray *mp, void *pPayload, struct client *client);
LOCAL const pProtoStubUDP udpJumpTable[] =
{
udp_noop_action,
udp_version_action,
bad_udp_cmd_action,
bad_udp_cmd_action,
bad_udp_cmd_action,
+3 -3
View File
@@ -67,7 +67,7 @@ void camsgtask ( struct client *client )
destroy_tcp_client ( client );
return;
}
cas_send_msg ( client, TRUE );
cas_send_bs_msg ( client, TRUE );
while ( TRUE ) {
client->recv.stk = 0;
@@ -143,10 +143,10 @@ void camsgtask ( struct client *client )
if (status < 0) {
errlogPrintf("CAS: io ctl err - %s\n",
SOCKERRSTR(SOCKERRNO));
cas_send_msg(client, TRUE);
cas_send_bs_msg(client, TRUE);
}
else if (nchars == 0){
cas_send_msg(client, TRUE);
cas_send_bs_msg(client, TRUE);
}
}
+96 -25
View File
@@ -44,11 +44,11 @@ typedef unsigned long arrayElementCount;
#include "server.h"
/*
* cas_send_msg()
* cas_send_bs_msg()
*
* (channel access server send message)
*/
void cas_send_msg ( struct client *pclient, int lock_needed )
void cas_send_bs_msg ( struct client *pclient, int lock_needed )
{
int status;
@@ -69,8 +69,7 @@ void cas_send_msg ( struct client *pclient, int lock_needed )
}
while ( pclient->send.stk ) {
status = sendto ( pclient->sock, pclient->send.buf, pclient->send.stk, 0,
(struct sockaddr *)&pclient->addr, sizeof(pclient->addr) );
status = send ( pclient->sock, pclient->send.buf, pclient->send.stk, 0 );
if ( status >= 0 ) {
unsigned transferSize = (unsigned) status;
if ( transferSize >= pclient->send.stk ) {
@@ -95,28 +94,17 @@ void cas_send_msg ( struct client *pclient, int lock_needed )
ipAddrToDottedIP ( &pclient->addr, buf, sizeof(buf) );
if(pclient->proto == IPPROTO_TCP) {
if ( (anerrno!=SOCK_ECONNABORTED&&
anerrno!=SOCK_ECONNRESET&&
anerrno!=SOCK_EPIPE&&
anerrno!=SOCK_ETIMEDOUT)||
CASDEBUG>2){
if ( (anerrno!=SOCK_ECONNABORTED&&
anerrno!=SOCK_ECONNRESET&&
anerrno!=SOCK_EPIPE&&
anerrno!=SOCK_ETIMEDOUT)||
CASDEBUG>2){
errlogPrintf (
"CAS: TCP send to \"%s\" failed because \"%s\"\n",
buf, SOCKERRSTR(anerrno));
}
pclient->disconnect = TRUE;
}
else if (pclient->proto == IPPROTO_UDP) {
errlogPrintf(
"CAS: UDP send to \"%s\" failed because \"%s\"\n",
(int)buf,
(int)SOCKERRSTR(anerrno));
}
else {
assert (0);
errlogPrintf (
"CAS: TCP send to \"%s\" failed because \"%s\"\n",
buf, SOCKERRSTR(anerrno));
}
pclient->disconnect = TRUE;
pclient->send.stk = 0u;
break;
}
@@ -131,6 +119,81 @@ void cas_send_msg ( struct client *pclient, int lock_needed )
return;
}
/*
* cas_send_dg_msg()
*
* (channel access server send udp message)
*/
void cas_send_dg_msg ( struct client * pclient )
{
int status;
int sizeDG;
char * pDG;
caHdr * pMsg;
if ( CASDEBUG > 2 && pclient->send.stk ) {
errlogPrintf ( "CAS: Sending a udp message of %d bytes\n", pclient->send.stk );
}
SEND_LOCK ( pclient );
if ( pclient->send.stk <= sizeof (caHdr) ) {
SEND_UNLOCK(pclient);
return;
}
pDG = pclient->send.buf;
pMsg = ( caHdr * ) pDG;
sizeDG = pclient->send.stk;
assert ( ntohs ( pMsg->m_cmmd ) == CA_PROTO_VERSION );
if ( CA_V411 ( pclient->minor_version_number ) ) {
pMsg->m_cid = htonl ( pclient->seqNoOfReq );
pMsg->m_dataType = htons ( sequenceNoIsValid );
}
else {
pDG += sizeof (caHdr);
sizeDG -= sizeof (caHdr);
}
status = sendto ( pclient->sock, pDG, sizeDG, 0,
(struct sockaddr *)&pclient->addr, sizeof(pclient->addr) );
if ( status >= 0 ) {
unsigned transferSize = (unsigned) status;
if ( transferSize >= sizeDG ) {
epicsTimeGetCurrent ( &pclient->time_at_last_send );
}
else {
errlogPrintf (
"cas: system failed to send entire udp frame?\n" );
}
}
else {
int anerrno = SOCKERRNO;
char buf[64];
ipAddrToDottedIP ( &pclient->addr, buf, sizeof(buf) );
errlogPrintf(
"CAS: UDP send to \"%s\" "
"failed because \"%s\"\n",
(int)buf,
(int)SOCKERRSTR(anerrno));
}
pclient->send.stk = 0u;
/*
* add placeholder for the first version message should it be needed
*/
rsrv_version_reply ( prsrv_cast_client );
SEND_UNLOCK(pclient);
DLOG ( 3, ( "------------------------------\n\n" ) );
return;
}
/*
*
* cas_copy_in_header()
@@ -177,7 +240,15 @@ int cas_copy_in_header (
pclient->send.stk = 0;
}
else{
cas_send_msg ( pclient, FALSE );
if ( pclient->proto == IPPROTO_TCP) {
cas_send_bs_msg ( pclient, FALSE );
}
else if ( pclient->proto == IPPROTO_UDP ) {
cas_send_dg_msg ( pclient );
}
else {
return FALSE;
}
}
}
+12 -4
View File
@@ -242,6 +242,11 @@ int cast_server(void)
}
casAttachThreadToClient ( prsrv_cast_client );
/*
* add placeholder for the first version message should it be needed
*/
rsrv_version_reply ( prsrv_cast_client );
while (TRUE) {
status = recvfrom (
@@ -261,18 +266,21 @@ int cast_server(void)
prsrv_cast_client->recv.stk = 0ul;
epicsTimeGetCurrent(&prsrv_cast_client->time_at_last_recv);
prsrv_cast_client->minor_version_number = 0;
prsrv_cast_client->seqNoOfReq = 0;
/*
* If we are talking to a new client flush to the old one
* in case we are holding UDP messages waiting to
* see if the next message is for this same client.
*/
if (prsrv_cast_client->send.stk) {
if (prsrv_cast_client->send.stk>sizeof(caHdr)) {
status = memcmp( (void *)&prsrv_cast_client->addr, (void *)&new_recv_addr, recv_addr_size);
if(status){
/*
* if the address is different
*/
cas_send_msg(prsrv_cast_client, TRUE);
cas_send_dg_msg(prsrv_cast_client);
prsrv_cast_client->addr = new_recv_addr;
}
}
@@ -327,11 +335,11 @@ int cast_server(void)
status = socket_ioctl(IOC_cast_sock, FIONREAD, &nchars);
if (status<0) {
errlogPrintf ("CA cast server: Unable to fetch N characters pending\n");
cas_send_msg (prsrv_cast_client, TRUE);
cas_send_dg_msg (prsrv_cast_client);
clean_addrq ();
}
else if (nchars == 0) {
cas_send_msg (prsrv_cast_client, TRUE);
cas_send_dg_msg (prsrv_cast_client);
clean_addrq ();
}
}
+6 -3
View File
@@ -41,7 +41,7 @@
#include "asLib.h"
#include "dbAddr.h"
#include "dbNotify.h"
#define CA_MINOR_PROTOCOL_REVISION 10
#define CA_MINOR_PROTOCOL_REVISION 11
#include "caProto.h"
#include "ellLib.h"
#include "epicsTime.h"
@@ -99,11 +99,12 @@ typedef struct client {
void *evuser;
char *pUserName;
char *pHostName;
epicsEventId blockSem; /* used whenever the client blocks */
epicsEventId blockSem; /* used whenever the client blocks */
SOCKET sock;
int proto;
epicsThreadId tid;
unsigned minor_version_number;
ca_uint32_t seqNoOfReq; /* for udp */
unsigned recvBytesToDrain;
unsigned priority;
char disconnect; /* disconnect detected */
@@ -199,7 +200,8 @@ GLBLTYPE unsigned rsrvSizeofLargeBufTCP;
#define UNLOCK_CLIENTQ epicsMutexUnlock (clientQlock);
void camsgtask (struct client *client);
void cas_send_msg (struct client *pclient, int lock_needed);
void cas_send_bs_msg ( struct client *pclient, int lock_needed );
void cas_send_dg_msg ( struct client *pclient );
int rsrv_online_notify_task (void);
int cast_server (void);
struct client *create_client ();
@@ -210,6 +212,7 @@ void casAttachThreadToClient ( struct client * );
int camessage ( struct client *client );
void write_notify_reply ( void *pArg );
int rsrvCheckPut ( const struct channel_in_use *pciu );
int rsrv_version_reply ( struct client *client );
/*
* inclming protocol maintetnance