Recvmmsg / recv / recvfrom blocks under heavy load

We have a Centos 6 application that calls recvmmsg() at the multicast address to read 1024 UDP packets at a time. When we run several instances of this application in the same field (everyone listens on the same traffic), sometimes this call will be blocked for several seconds, despite the fact that the socket is not blocked and passes through MSG_DONTWAIT . It works great in all other circumstances, but freezes under heavy load (50 MB / s). When the application is blocked, we lag behind UDP traffic and cannot recover. A process is performed using the RR scheduler as a high priority to avoid interference from other processes. We tried switching to recvfrom() and recv() in a for loop with the same results as well.

The only thing we can see in the kernel source that can block this is spin_lock_irqsave() in the queue lock in __skb_try_recv_datagram() . But I do not know under what circumstances this will be a problem, or what to do with it to prevent blocking, or if it is really a problem.

I'm not sure where to look next, so any pointers would be appreciated.

I created a very simple program that can replicate this on one of the servers where we see it (we do not insert the interface search function, but this should not be appropriate here, let me know if you need it).

Example recv ():

 int main(){ int fd = socket(AF_INET,SOCK_DGRAM,0); int flags = fcntl(fd,F_GETFL,0); fcntl(fd,F_SETFL, flags | O_NONBLOCK); int reuse = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse)); struct sockaddr_in sockaddr; sockaddr.sin_port = htons(4755); sockaddr.sin_family = AF_INET; sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){ printf("Failed to bind.\n"); return 1; } in_addr_t interface; if(!getInterface("192.168.15.255",&interface)){ printf("Failed to get interface.\n"); return 1; } struct ip_mreq imr; memset(&imr,0,sizeof(imr)); imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255"); imr.imr_interface.s_addr = interface; if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){ printf("Group not in multicast."); return 1; } if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr)) < 0){ printf("Failed to add membership, errno: %d.\n",errno); return 1; } int epollInstance = epoll_create1(0); struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192); epollEvents[0].events = EPOLLIN; epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]); const int PACKETS_TO_READ = 1024; static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX]; static struct iovec iovecs[PACKETS_TO_READ]; static struct mmsghdr msgs[PACKETS_TO_READ]; static struct sockaddr_in sockFrom[PACKETS_TO_READ]; for (int i = 0; i < PACKETS_TO_READ; i++) { iovecs[i].iov_base = receiveBuffer[i]; iovecs[i].iov_len = USHRT_MAX; msgs[i].msg_hdr.msg_iov = &iovecs[i]; msgs[i].msg_hdr.msg_iovlen = 1; msgs[i].msg_hdr.msg_name = &sockFrom[i]; msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in ); } struct timeval start; struct timeval end; while(1){ int selected = epoll_wait(epollInstance,epollEvents,8192,10); if(selected > 0){ gettimeofday(&start,NULL); // uncomment this line and comment out the below for loop to switch to recvmmsg, both show the issue // int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0); int numPackets = 0; for(int i = 0; i < PACKETS_TO_READ; i++){ int result = recv(fd,receiveBuffer[0],USHRT_MAX,MSG_DONTWAIT); if(result == EAGAIN) break; numPackets++; } gettimeofday(&end,NULL); printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec); } } } 

Example recvmmsg ():

 int main(){ int fd = socket(AF_INET,SOCK_DGRAM,0); int flags = fcntl(fd,F_GETFL,0); fcntl(fd,F_SETFL, flags | O_NONBLOCK); int reuse = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse)); struct sockaddr_in sockaddr; sockaddr.sin_port = htons(4755); sockaddr.sin_family = AF_INET; sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){ printf("Failed to bind.\n"); return 1; } in_addr_t interface; if(!getInterface("192.168.15.255",&interface)){ printf("Failed to get interface.\n"); return 1; } struct ip_mreq imr; memset(&imr,0,sizeof(imr)); imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255"); imr.imr_interface.s_addr = interface; if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){ printf("Group not in multicast."); return 1; } if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr)) < 0){ printf("Failed to add membership, errno: %d.\n",errno); return 1; } int epollInstance = epoll_create1(0); struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192); epollEvents[0].events = EPOLLIN; epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]); const int PACKETS_TO_READ = 1024; static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX]; static struct iovec iovecs[PACKETS_TO_READ]; static struct mmsghdr msgs[PACKETS_TO_READ]; static struct sockaddr_in sockFrom[PACKETS_TO_READ]; for (int i = 0; i < PACKETS_TO_READ; i++) { iovecs[i].iov_base = receiveBuffer[i]; iovecs[i].iov_len = USHRT_MAX; msgs[i].msg_hdr.msg_iov = &iovecs[i]; msgs[i].msg_hdr.msg_iovlen = 1; msgs[i].msg_hdr.msg_name = &sockFrom[i]; msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in ); } struct timeval start; struct timeval end; while(1){ int selected = epoll_wait(epollInstance,epollEvents,8192,10); if(selected > 0){ gettimeofday(&start,NULL); // uncomment this line and comment out the below for loop to switch to recvmmsg, both show the issue int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0); gettimeofday(&end,NULL); printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec); } } } 
+5
source share

Source: https://habr.com/ru/post/1269003/


All Articles