I am writing a simple test client and ICOP server to make sure that I use the API correctly and that the data sent by the client is correctly received by the server. I have included all the code for this question.
Here, I ran into some problems: sometimes the data in the receive buffers gets corrupted (damaged in that sometimes pieces of data in the buffers may be inoperative or missing). To be clear, this is data within separate receive buffers, I do not mean out of order between several buffers due to thread scheduling problems. I previously posted a question related to this here . However, I did more work to get the correct code example, so I will post a new question and get in touch with it. I hope others can run this code and experience the same weird behavior.
Test code
The test application can work in two modes: client and server. Start the server, and it will start listening, start the client and connect to the server, and as soon as it connects, it will start throwing data on the server as quickly as it will allow. The server then validates the data in each buffer that is returned from GetQueuedCompletionStatus after WSARecv calls. Each time the WSASend completes, I nullify the OVERLAPPED section of the structure and call WSASend again with the original data buffer.
Each data buffer sent by the client is a sequence of bytes that are incremented one by one to the specified maximum. I do not send the full range 0..255 in case this size fits neatly into the packages and somehow hides the problem, so in my example, the bytes of the code range from 0..250. For each configured send buffer, I repeat this pattern number of OfGroups times.
, WSARecvs, , . .. , max, reset 0. , , .
, WSASend WSARecv . , , , 2 WSARecv. 1 , .
Ive Windows 7 Visual Studio 2010 ++.
, , , . 2 , -, , .
IOCP, , , . , , - WSARecv GetQueuedCompletionStatus.
WSARecv
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
WSARecv , - ,
void IOCPWorker::execute()
{
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soRecv:
{
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred);
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
default:;
}
}
}
}
- > onRecv - . - ?
, .
, 3000 WSARecv
> IOCPTest.exe server 3000 2
, 127.0.0.1 3000 WSASend
> IOCPTest.exe client 127.0.0.1 3000 2
IOCPConnectionManager
, .
IOCPConnection
SOCKET . IOCPConnection:: onRecv , WSARecv . , .
IOCPWorker
. IOCPWorker:: execute() - , GetQueuedCompletionStatus.
TestOverlapped
OVERLAPPED.
Ws2_32.lib Mswsock.lib .
cpp
#include "stdafx.h"
#include <iostream>
#include <string>
#include "IOCPTest.h"
#include <Windows.h>
void printUse()
{
std::cout << "Invalid arguments" << std::endl;
std::cout << "This test app has very limited error handling or memory management" << std::endl;
std::cout << "Run as client or server (run the server first) e.g." << std::endl << std::endl;
std::cout << "To run as server listening on port 3000 with 2 pending receives:" << std::endl;
std::cout << "> IOCPTester.exe server 3000 2" << std::endl << std::endl;
std::cout << "To run as client connected to 127.0.0.1 on port 3000 with 2 pending sends:" << std::endl;
std::cout << "> IOCPTester.exe client 127.0.0.1 3000 2" << std::endl << std::endl;
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
int main(int argc, char *argv[])
{
if (argc < 4)
{
printUse();
return 0;
}
std::string mode(argv[1]);
if ((mode.compare("client") != 0) && (mode.compare("server") != 0))
{
printUse();
return 0;
}
IOCPTest::IOCPConnectionManager *manager = new IOCPTest::IOCPConnectionManager();
bool server = mode.compare("server") == 0;
if (server)
{
std::string listenPort(argv[2]);
std::string postedReceiveCount(argv[3]);
manager->listenPort = atoi(listenPort.c_str());
manager->postedReceiveCount = atoi(postedReceiveCount.c_str());
manager->postedSendCount = 1;
manager->startListening();
}
else
{
if (argc < 5)
{
printUse();
return 0;
}
std::string host(argv[2]);
std::string port(argv[3]);
std::string postedSendCount(argv[4]);
manager->postedReceiveCount = 1;
manager->postedSendCount = atoi(postedSendCount.c_str());
IOCPTest::IOCPConnection *connection = manager->createConnection();
connection->host = host;
connection->port = atoi(port.c_str());
connection->connect();
}
std::cout << "Hit enter to exit" << std::endl;
std::cin.ignore();
}
IOCPTest.h
#ifndef IOCPTestH
#define IOCPTestH
#endif
#include <WinSock2.h> // Include before as otherwise Windows.h includes and causes issues
#include <Windows.h>
#include <string>
namespace IOCPTest
{
class IOCPConnection;
enum IOCPSocketOperation
{
soUnknown,
soAccept,
soConnect,
soDisconnect,
soSend,
soRecv,
soQuit
};
struct TestOverlapped
{
OVERLAPPED overlapped;
WSABUF buffer;
IOCPSocketOperation operation;
IOCPConnection *connection;
bool resend;
TestOverlapped(int bufferSize);
~TestOverlapped();
void reset();
};
typedef TestOverlapped *PTestOverlapped;
class IOCPConnectionManager
{
public:
static const int NUMACCEPTS = 5;
WSADATA wsaData;
HANDLE iocp;
SOCKET listenSocket;
USHORT listenPort;
int postedReceiveCount;
int postedSendCount;
void startListening();
void postAcceptEx();
IOCPConnection *createConnection();
IOCPConnectionManager();
};
class IOCPConnection
{
public:
SOCKET socket;
IOCPConnectionManager *manager;
std::string host;
USHORT port;
void onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void postRecv(PTestOverlapped overlapped = nullptr);
void onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void send(PTestOverlapped overlapped);
void onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred);
void connect();
};
class IOCPWorker
{
public:
HANDLE threadHandle;
DWORD threadId;
IOCPConnectionManager *manager;
IOCPWorker(bool suspended);
void start();
void execute();
};
}
IOCPTest.cpp
#include "stdafx.h"
#include "IOCPTest.h"
#include <iostream>
#include <Mswsock.h>
#include <WS2tcpip.h>
#include <sstream>
namespace IOCPTest
{
LPFN_ACCEPTEX fnAcceptEx = nullptr;
LPFN_CONNECTEX fnConnectEx = nullptr;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidConnectEx = WSAID_CONNECTEX;
const byte maxByteExpected = 250;
const int numberOfGroups = 4096;
const int receiveBufferSize = 0x100000;
BOOL AcceptEx
(
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
)
{
if (fnAcceptEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(sListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof (GuidAcceptEx), &fnAcceptEx, sizeof(fnAcceptEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for AcceptEx" << std::endl;
return false;
}
}
return fnAcceptEx(sListenSocket, sAcceptSocket, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped);
}
BOOL ConnectEx(
SOCKET s,
const struct sockaddr FAR *name,
int namelen,
PVOID lpSendBuffer,
DWORD dwSendDataLength,
LPDWORD lpdwBytesSent,
LPOVERLAPPED lpOverlapped
)
{
if (fnConnectEx == nullptr)
{
DWORD dwBytes;
int result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx, sizeof (GuidConnectEx), &fnConnectEx, sizeof(fnConnectEx), &dwBytes, NULL, NULL);
if (result != 0)
{
std::cerr << "Error calling WSAIoctl for ConnectEx" << std::endl;
return false;
}
}
return fnConnectEx(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped);
}
TestOverlapped::TestOverlapped(int bufferSize):
overlapped(),
operation(soUnknown),
connection(nullptr),
buffer(),
resend(false)
{
if (bufferSize > 0)
{
buffer.len = bufferSize;
buffer.buf = (CHAR*) malloc(bufferSize);
}
}
TestOverlapped::~TestOverlapped()
{
if (buffer.buf != nullptr)
{
free(buffer.buf);
}
}
void TestOverlapped::reset()
{
overlapped = OVERLAPPED();
}
IOCPConnectionManager::IOCPConnectionManager():
wsaData(),
listenSocket(0),
listenPort(0),
postedReceiveCount(1)
{
WSAStartup(WINSOCK_VERSION, &wsaData);
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
SYSTEM_INFO systemInfo = SYSTEM_INFO();
GetSystemInfo(&systemInfo);
for (decltype(systemInfo.dwNumberOfProcessors) i = 0; i < systemInfo.dwNumberOfProcessors; i++)
{
IOCPWorker* worker = new IOCPWorker(true);
worker->manager = this;
worker->start();
}
}
void IOCPConnectionManager::startListening()
{
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
CreateIoCompletionPort((HANDLE)listenSocket, iocp, ULONG_PTR(this), 0);
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY;
localAddress.sin_port = htons(listenPort);
if (bind(listenSocket, (SOCKADDR*) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error in binding listening socket" << std::endl;
}
if (listen(listenSocket, SOMAXCONN) == 0)
{
std::cout << "Listening on port " << listenPort << std::endl;
}
for (int i = 0; i < NUMACCEPTS; i++)
{
postAcceptEx();
}
}
void IOCPConnectionManager::postAcceptEx()
{
SOCKET acceptSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
connection->socket = acceptSocket;
CreateIoCompletionPort((HANDLE) acceptSocket, iocp, ULONG_PTR(connection), 0);
PTestOverlapped overlapped = new TestOverlapped(2 * (sizeof(sockaddr_in) + 16));
overlapped->operation = soAccept;
overlapped->connection = connection;
DWORD byesReceived = 0;
int result = IOCPTest::AcceptEx
(
listenSocket,
acceptSocket,
overlapped->buffer.buf,
0,
sizeof(sockaddr_in) + 16,
sizeof(sockaddr_in) + 16,
&byesReceived,
(LPOVERLAPPED) overlapped
);
if (!result)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling AcceptEx. Returned errorCode = " << errorCode << std::endl;
}
}
}
IOCPConnection *IOCPConnectionManager::createConnection()
{
IOCPConnection *connection = new IOCPConnection();
connection->manager = this;
return connection;
}
void IOCPConnection::onAcceptEx(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
manager->postAcceptEx();
auto returnCode = setsockopt(socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (const char *)&manager->listenSocket, sizeof(manager->listenSocket));
if (returnCode == SOCKET_ERROR)
{
std::cerr << "SetSockOpt in OnAcceptEx returned SOCKET_ERROR" << std::endl;
}
std::cout << "Connection Accepted" << std::endl;
for (int i = 0; i < manager->postedReceiveCount; ++i)
{
postRecv();
}
}
void IOCPConnection::postRecv(PTestOverlapped overlapped)
{
DWORD numberOfBytesTransferred = 0;
DWORD flags = 0;
if (overlapped == nullptr)
{
overlapped = new TestOverlapped(receiveBufferSize);
overlapped->connection = this;
}
else
{
overlapped->reset();
}
overlapped->operation = soRecv;
auto returnCode = WSARecv(socket, &(overlapped->buffer), 1, &numberOfBytesTransferred, &flags, (LPWSAOVERLAPPED) overlapped, nullptr);
}
void IOCPConnection::onRecv(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
if (numberOfBytesTransferred > 0)
{
byte *data = (byte *)overlapped->buffer.buf;
if (data[0] > maxByteExpected)
{
std::cerr << "Byte greater than max expected found. Max Expected: " << maxByteExpected << "; Found: " << data[0] << std::endl;
return;
}
byte next = (data[0] == maxByteExpected)?0:data[0] + 1;
for (decltype(numberOfBytesTransferred) i = 1; i < numberOfBytesTransferred; ++i)
{
if (data[i] != next)
{
std::cerr << "Invalid data. Expected: " << (int)next << "; Got: " << (int)data[i] << std::endl;
return;
}
else if (next == maxByteExpected)
{
next = 0;
}
else
{
++next;
}
}
}
}
void IOCPConnection::onConnect(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
for (int i = 0; i < manager->postedSendCount; ++i)
{
PTestOverlapped sendOverlapped = new TestOverlapped((maxByteExpected + 1) * numberOfGroups);
sendOverlapped->connection = this;
for (int j = 0; j < numberOfGroups; ++j)
{
for (byte k = 0; k <= maxByteExpected; ++k)
{
((byte *)sendOverlapped->buffer.buf)[(j * (maxByteExpected + 1)) + (int)k] = k;
}
}
sendOverlapped->resend = true;
send(sendOverlapped);
}
}
void IOCPConnection::send(PTestOverlapped overlapped)
{
overlapped->reset();
overlapped->operation = soSend;
DWORD bytesSent = 0;
DWORD flags = 0;
if (WSASend(socket, &overlapped->buffer, 1, &bytesSent, flags, (LPWSAOVERLAPPED) overlapped, nullptr) == SOCKET_ERROR)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling WSASend. Returned errorCode = " << errorCode << std::endl;
}
}
}
void IOCPConnection::onSent(PTestOverlapped overlapped, DWORD numberOfBytesTransferred)
{
}
void IOCPConnection::connect()
{
socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket == INVALID_SOCKET)
{
std::cerr << "Error calling socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) in IOCPConnection::connect()" << std::endl;
return;
}
CreateIoCompletionPort((HANDLE)socket, manager->iocp, ULONG_PTR(this), 0);
sockaddr_in localAddress = sockaddr_in();
localAddress.sin_family = AF_INET;
localAddress.sin_addr.s_addr = INADDR_ANY;
localAddress.sin_port = 0;
if (bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress)) == SOCKET_ERROR)
{
std::cerr << "Error calling bind(socket, (SOCKADDR *) &localAddress, sizeof(localAddress) in IOCPConnection::connect()" << std::endl;
return;
}
addrinfo hints = addrinfo();
addrinfo *remoteAddress = nullptr;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
std::stringstream ss;
ss << port;
if (getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) != 0)
{
std::cerr << "Error calling getaddrinfo(host.c_str(), ss.str().c_str(), &hints, &remoteAddress) in IOCPConnection::connect()" << std::endl;
return;
}
TestOverlapped *overlapped = new TestOverlapped(0);
overlapped->connection = this;
overlapped->operation = soConnect;
BOOL result = IOCPTest::ConnectEx
(
socket,
remoteAddress->ai_addr,
remoteAddress->ai_addrlen,
nullptr,
0,
nullptr,
LPOVERLAPPED(overlapped)
);
if (result == FALSE)
{
int errorCode = WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
std::cerr << "Error calling ConnectEx. Returned errorCode = " << errorCode << std::endl;
}
}
freeaddrinfo(remoteAddress);
}
DWORD WINAPI IOCPWorkerThreadProc(LPVOID lpParam)
{
((IOCPWorker*)lpParam)->execute();
return 0;
}
IOCPWorker::IOCPWorker(bool suspended)
{
threadHandle = CreateThread(NULL, 0, IOCPWorkerThreadProc, this, (suspended)?CREATE_SUSPENDED:0, &threadId);
}
void IOCPWorker::start()
{
ResumeThread(threadHandle);
}
void IOCPWorker::execute()
{
bool quit = false;
DWORD numberOfBytesTransferred = 0;
ULONG_PTR completionKey = NULL;
PTestOverlapped overlapped = nullptr;
while (!quit)
{
auto queueResult = GetQueuedCompletionStatus(manager->iocp, &numberOfBytesTransferred, &completionKey, (LPOVERLAPPED *)&overlapped, INFINITE);
if (queueResult)
{
switch (overlapped->operation)
{
case soAccept:
{
IOCPConnection *connection = overlapped->connection;
connection->onAcceptEx(overlapped, numberOfBytesTransferred);
delete overlapped;
overlapped = nullptr;
break;
}
case soConnect:
{
std::cout << "ConnectEx returned" << std::endl;
IOCPConnection *connection = overlapped->connection;
connection->onConnect(overlapped, numberOfBytesTransferred);
delete overlapped;
overlapped = nullptr;
break;
}
case soRecv:
{
IOCPConnection *connection = overlapped->connection;
connection->onRecv(overlapped, numberOfBytesTransferred);
overlapped->reset();
connection->postRecv(overlapped);
overlapped = nullptr;
break;
}
case soSend:
{
IOCPConnection *connection = overlapped->connection;
connection->onSent(overlapped, numberOfBytesTransferred);
std::cout << "Resending buffer" << std::endl;
if (overlapped->resend)
{
connection->send(overlapped);
}
else
{
delete overlapped;
}
overlapped = nullptr;
break;
}
default:;
}
}
}
}
}
, , 2 :
Invalid data. Expected: 169; Got: 123
Invalid data. Expected: 114; Got: 89
Invalid data. Expected: 89; Got: 156
Invalid data. Expected: 206; Got: 227
Invalid data. Expected: 125; Got: 54
Invalid data. Expected: 25; Got: 0
Invalid data. Expected: 58; Got: 146
Invalid data. Expected: 33; Got: 167
Invalid data. Expected: 212; Got: 233
Invalid data. Expected: 111; Got: 86
Invalid data. Expected: 86; Got: 153
Invalid data. Expected: 190; Got: 165
Invalid data. Expected: 175; Got: 150
Invalid data. Expected: 150; Got: 217
Invalid data. Expected: 91; Got: 112
Invalid data. Expected: 95; Got: 162
Invalid data. Expected: 207; Got: 182
Invalid data. Expected: 222; Got: 243
Invalid data. Expected: 126; Got: 101
Invalid data. Expected: 157; Got: 132
Invalid data. Expected: 160; Got: 89
Invalid data. Expected: 205; Got: 180
Invalid data. Expected: 113; Got: 134
Invalid data. Expected: 45; Got: 20
Invalid data. Expected: 113; Got: 201
Invalid data. Expected: 64; Got: 198
Invalid data. Expected: 115; Got: 182
Invalid data. Expected: 140; Got: 115
, - , . , , , - , . , IOCP, , , . , IOCP, , . , . , - .