How can I abort socket.recvfrom () from another thread in python?

This seems like a duplicate. How do I cancel socket.recv () from another thread in Python , but it is not, because I want to abort recvfrom () in a thread that is UDP, not TCP.

Can this be solved with poll () or select.select ()?

+6
source share
3 answers

If you want to unblock UDP reading from another stream, send it a datagram!

Rgds, Martin

+5
source

A good way to handle this asynchronous interrupt is with an old pipe trick. You can create a channel and use select / poll for both sockets and channels: now that you need an interrupt receiver, you can simply send a char to the channel.

  • pros:
    • It can work for both UDP and TCP
    • Is the protocol agnostic
  • minuses:
    • pipe selection / polling is not available on Windows, in which case you should replace it with another UDP socket, which is used as a notification channel

starting point

interruptable_socket.py

 import os import socket import select class InterruptableUdpSocketReceiver(object): def __init__(self, host, port): self._host = host self._port = port self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._r_pipe, self._w_pipe = os.pipe() self._interrupted = False def bind(self): self._socket.bind((self._host, self._port)) def recv(self, buffersize, flags=0): if self._interrupted: raise RuntimeError("Cannot be reused") read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket]) if self._socket in read: return self._socket.recv(buffersize, flags) return "" def interrupt(self): self._interrupted = True os.write(self._w_pipe, "I".encode()) 

Test suite:

test_interruptable_socket.py

 import socket from threading import Timer import time from interruptable_socket import InterruptableUdpSocketReceiver import unittest class Sender(object): def __init__(self, destination_host, destination_port): self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self._dest = (destination_host, destination_port) def send(self, message): self._socket.sendto(message, self._dest) class Test(unittest.TestCase): def create_receiver(self, host="127.0.0.1", port=3010): receiver = InterruptableUdpSocketReceiver(host, port) receiver.bind() return receiver def create_sender(self, host="127.0.0.1", port=3010): return Sender(host, port) def create_sender_receiver(self, host="127.0.0.1", port=3010): return self.create_sender(host, port), self.create_receiver(host, port) def test_create(self): self.create_receiver() def test_recv_async(self): sender, receiver = self.create_sender_receiver() start = time.time() send_message = "TEST".encode('UTF-8') Timer(0.1, sender.send, (send_message, )).start() message = receiver.recv(128) elapsed = time.time()-start self.assertGreaterEqual(elapsed, 0.095) self.assertLess(elapsed, 0.11) self.assertEqual(message, send_message) def test_interrupt_async(self): receiver = self.create_receiver() start = time.time() Timer(0.1, receiver.interrupt).start() message = receiver.recv(128) elapsed = time.time()-start self.assertGreaterEqual(elapsed, 0.095) self.assertLess(elapsed, 0.11) self.assertEqual(0, len(message)) def test_exception_after_interrupt(self): sender, receiver = self.create_sender_receiver() receiver.interrupt() with self.assertRaises(RuntimeError): receiver.recv(128) if __name__ == '__main__': unittest.main() 

Evolution

Now this code is just a starting point. To make it more general, I see that we need to fix the following problems:

  • Interface : returning an empty message in case of interruption is not very good, it is better to use an exception to handle it.
  • Generalization : we should only have a function to call before socket.recv() , distributing interrupts to other recv methods becomes very simple
  • Portability : to make a simple port for Windows, we must isolate the asynchronous notification in the object in order to choose the correct implementation for our operating system.

First of all, we change test_interrupt_async() to check for an exception instead of an empty message:

 from interruptable_socket import InterruptException def test_interrupt_async(self): receiver = self.create_receiver() start = time.time() with self.assertRaises(InterruptException): Timer(0.1, receiver.interrupt).start() receiver.recv(128) elapsed = time.time()-start self.assertGreaterEqual(elapsed, 0.095) self.assertLess(elapsed, 0.11) 

After that, we can replace return '' with raise InterruptException , and the tests will pass again.

The finished version for the extension can be:

interruptable_socket.py

 import os import socket import select class InterruptException(Exception): pass class InterruptableUdpSocketReceiver(object): def __init__(self, host, port): self._host = host self._port = port self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._async_interrupt = AsycInterrupt(self._socket) def bind(self): self._socket.bind((self._host, self._port)) def recv(self, buffersize, flags=0): self._async_interrupt.wait_for_receive() return self._socket.recv(buffersize, flags) def interrupt(self): self._async_interrupt.interrupt() class AsycInterrupt(object): def __init__(self, descriptor): self._read, self._write = os.pipe() self._interrupted = False self._descriptor = descriptor def interrupt(self): self._interrupted = True self._notify() def wait_for_receive(self): if self._interrupted: raise RuntimeError("Cannot be reused") read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor]) if self._descriptor not in read: raise InterruptException def _notify(self): os.write(self._write, "I".encode()) 

Now it wraps around more recv , implements a version of Windows or takes care of socket timeouts, it becomes very simple.

+3
source

Implement the quit command on the server and client sockets. Something like this should work:

 Thread1: status: listening handler: quit Thread2: client exec: socket.send "quit" ---> Thread1.socket @ host:port Thread1: status: socket closed() 
0
source

Source: https://habr.com/ru/post/897492/


All Articles