Read / write sockets ("full-duplex") at the same time on Linux (aio specially)

I am migrating an application based on the ACE Proactor platform. The application works fine for both VxWorks and Windows, but it does not work on Linux (CentOS 5.5, WindRiver Linux 1.4 and 3.0) with the 2.6.XX kernel - using librt.

I narrowed the problem down to a very simple problem: An application starts an asynchronous (via aio_read) read operation on a socket and then starts an asynchronous (via aio_write) write on the same socket. The read operation has not yet been completed because the protocol is initialized from the end of the application. - When the socket is in blocking mode, recording is never reached and the protocol freezes. - When using the O_NONBLOCK socket, writing is successful, but reading returns indefinitely with the error "EWOULDBLOCK / EAGAIN", never restored (even if the AIO reboots).

I went through several forums and could not find a definitive answer to the question whether this should work (and I am doing something wrong) or impossible with Linux AIO. Is it possible if I omitted AIO and looked for another implementation (via epoll / poll / select, etc.)?

Attached is a code example for quickly re-creating a problem in a non-blocking socket:

#include <aio.h> #include <stdio.h> #include <stdlib.h> #include <netdb.h> #include <string.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/types.h> #include <assert.h> #include <errno.h> #define BUFSIZE (100) // Global variables struct aiocb *cblist[2]; int theSocket; void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer) { bzero( (char *)pAiocb, sizeof(struct aiocb) ); pAiocb->aio_fildes = theSocket; pAiocb->aio_nbytes = BUFSIZE; pAiocb->aio_offset = 0; pAiocb->aio_buf = pBuffer; } void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer) { InitializeAiocbData(pAiocb, pBuffer); int ret = aio_read( pAiocb ); assert (ret >= 0); } void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer) { InitializeAiocbData(pAiocb, pBuffer); int ret = aio_write( pAiocb ); assert (ret >= 0); } int main() { int ret; int nPort = 11111; char* szServer = "10.10.9.123"; // Connect to the remote server theSocket = socket(AF_INET, SOCK_STREAM, 0); assert (theSocket >= 0); struct hostent *pServer; struct sockaddr_in serv_addr; pServer = gethostbyname(szServer); bzero((char *) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(nPort); bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length); assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0); // Set the socket to be non-blocking int oldFlags = fcntl(theSocket, F_GETFL) ; int newFlags = oldFlags | O_NONBLOCK; fcntl(theSocket, F_SETFL, newFlags); printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags); // Construct the AIO callbacks array struct aiocb my_aiocb1, my_aiocb2; char* pBuffer = new char[BUFSIZE+1]; bzero( (char *)cblist, sizeof(cblist) ); cblist[0] = &my_aiocb1; cblist[1] = &my_aiocb2; // Start the read and write operations on the same socket IssueReadOperation(&my_aiocb1, pBuffer); IssueWriteOperation(&my_aiocb2, pBuffer); // Wait for I/O completion on both operations int nRound = 1; printf("\naio_suspend round #%d:\n", nRound++); ret = aio_suspend( cblist, 2, NULL ); assert (ret == 0); // Check the error status for the read and write operations ret = aio_error(&my_aiocb1); assert (ret == EWOULDBLOCK); // Get the return code for the read { ssize_t retcode = aio_return(&my_aiocb1); printf("First read operation results: aio_error=%d, aio_return=%d - That the first EWOULDBLOCK\n", ret, retcode); } ret = aio_error(&my_aiocb2); assert (ret == EINPROGRESS); printf("Write operation is still \"in progress\"\n"); // Re-issue the read operation IssueReadOperation(&my_aiocb1, pBuffer); // Wait for I/O completion on both operations printf("\naio_suspend round #%d:\n", nRound++); ret = aio_suspend( cblist, 2, NULL ); assert (ret == 0); // Check the error status for the read and write operations for the second time ret = aio_error(&my_aiocb1); assert (ret == EINPROGRESS); printf("Second read operation request is suddenly marked as \"in progress\"\n"); ret = aio_error(&my_aiocb2); assert (ret == 0); // Get the return code for the write { ssize_t retcode = aio_return(&my_aiocb2); printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode); } // Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely do { printf("\naio_suspend round #%d:\n", nRound++); ret = aio_suspend( cblist, 1, NULL ); assert (ret == 0); // Check the error of the read operation and re-issue if needed ret = aio_error(&my_aiocb1); if (ret == EWOULDBLOCK) { IssueReadOperation(&my_aiocb1, pBuffer); printf("EWOULDBLOCK again on the read operation!\n"); } } while (ret == EWOULDBLOCK); } 

Thanks in advance, Yotam.

+4
source share
1 answer

First, O_NONBLOCK and AIO do not mix. AIO will report that the asynchronous operation is completed when the corresponding read or write not blocked - and with O_NONBLOCK they will never be blocked, so the O_NONBLOCK request will always be completed immediately (using aio_return() providing EWOULDBLOCK ).

Secondly, do not use the same buffer for two simultaneous outstanding aio requests. The buffer should be considered completely disabled between the time when the aio_error() request was sent and when aio_error() reports that it has completed.

Thirdly, AIO requests for the same file descriptor are queued to give reasonable results. This means that your recording will not be performed until the reading is completed - if you need to write the data first, you need to release the AIO in the reverse order. The following will work fine without setting O_NONBLOCK :

 struct aiocb my_aiocb1, my_aiocb2; char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message"; const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 }; // Start the read and write operations on the same socket IssueWriteOperation(&my_aiocb2, pBuffer2); IssueReadOperation(&my_aiocb1, pBuffer1); // Wait for I/O completion on both operations int nRound = 1; int aio_status1, aio_status2; do { printf("\naio_suspend round #%d:\n", nRound++); ret = aio_suspend( cblist, 2, NULL ); assert (ret == 0); // Check the error status for the read and write operations aio_status1 = aio_error(&my_aiocb1); if (aio_status1 == EINPROGRESS) puts("aio1 still in progress."); else puts("aio1 completed."); aio_status2 = aio_error(&my_aiocb2); if (aio_status2 == EINPROGRESS) puts("aio2 still in progress."); else puts("aio2 completed."); } while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS); // Get the return code for the read ssize_t retcode; retcode = aio_return(&my_aiocb1); printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); retcode = aio_return(&my_aiocb1); printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

Alternatively, if you do not like the reads and writes arranged relative to each other, you can use dup() to create two file descriptors for the socket and use them for reading, and the other for writing - each will perform AIO operations separately.

+3
source

Source: https://habr.com/ru/post/1334522/


All Articles