A good way to handle this asynchronous interrupt is with an old pipe trick. You can create a channel and use select
/ poll
for both sockets and channels: now that you need an interrupt receiver, you can simply send a char to the channel.
- pros:
- It can work for both UDP and TCP
- Is the protocol agnostic
- minuses:
- pipe selection / polling is not available on Windows, in which case you should replace it with another UDP socket, which is used as a notification channel
starting point
interruptable_socket.py
import os import socket import select class InterruptableUdpSocketReceiver(object): def __init__(self, host, port): self._host = host self._port = port self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._r_pipe, self._w_pipe = os.pipe() self._interrupted = False def bind(self): self._socket.bind((self._host, self._port)) def recv(self, buffersize, flags=0): if self._interrupted: raise RuntimeError("Cannot be reused") read, _w, errors = select.select([self._r_pipe, self._socket], [], [self._socket]) if self._socket in read: return self._socket.recv(buffersize, flags) return "" def interrupt(self): self._interrupted = True os.write(self._w_pipe, "I".encode())
Test suite:
test_interruptable_socket.py
import socket from threading import Timer import time from interruptable_socket import InterruptableUdpSocketReceiver import unittest class Sender(object): def __init__(self, destination_host, destination_port): self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self._dest = (destination_host, destination_port) def send(self, message): self._socket.sendto(message, self._dest) class Test(unittest.TestCase): def create_receiver(self, host="127.0.0.1", port=3010): receiver = InterruptableUdpSocketReceiver(host, port) receiver.bind() return receiver def create_sender(self, host="127.0.0.1", port=3010): return Sender(host, port) def create_sender_receiver(self, host="127.0.0.1", port=3010): return self.create_sender(host, port), self.create_receiver(host, port) def test_create(self): self.create_receiver() def test_recv_async(self): sender, receiver = self.create_sender_receiver() start = time.time() send_message = "TEST".encode('UTF-8') Timer(0.1, sender.send, (send_message, )).start() message = receiver.recv(128) elapsed = time.time()-start self.assertGreaterEqual(elapsed, 0.095) self.assertLess(elapsed, 0.11) self.assertEqual(message, send_message) def test_interrupt_async(self): receiver = self.create_receiver() start = time.time() Timer(0.1, receiver.interrupt).start() message = receiver.recv(128) elapsed = time.time()-start self.assertGreaterEqual(elapsed, 0.095) self.assertLess(elapsed, 0.11) self.assertEqual(0, len(message)) def test_exception_after_interrupt(self): sender, receiver = self.create_sender_receiver() receiver.interrupt() with self.assertRaises(RuntimeError): receiver.recv(128) if __name__ == '__main__': unittest.main()
Evolution
Now this code is just a starting point. To make it more general, I see that we need to fix the following problems:
- Interface : returning an empty message in case of interruption is not very good, it is better to use an exception to handle it.
- Generalization : we should only have a function to call before
socket.recv()
, distributing interrupts to other recv
methods becomes very simple - Portability : to make a simple port for Windows, we must isolate the asynchronous notification in the object in order to choose the correct implementation for our operating system.
First of all, we change test_interrupt_async()
to check for an exception instead of an empty message:
from interruptable_socket import InterruptException def test_interrupt_async(self): receiver = self.create_receiver() start = time.time() with self.assertRaises(InterruptException): Timer(0.1, receiver.interrupt).start() receiver.recv(128) elapsed = time.time()-start self.assertGreaterEqual(elapsed, 0.095) self.assertLess(elapsed, 0.11)
After that, we can replace return ''
with raise InterruptException
, and the tests will pass again.
The finished version for the extension can be:
interruptable_socket.py
import os import socket import select class InterruptException(Exception): pass class InterruptableUdpSocketReceiver(object): def __init__(self, host, port): self._host = host self._port = port self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._async_interrupt = AsycInterrupt(self._socket) def bind(self): self._socket.bind((self._host, self._port)) def recv(self, buffersize, flags=0): self._async_interrupt.wait_for_receive() return self._socket.recv(buffersize, flags) def interrupt(self): self._async_interrupt.interrupt() class AsycInterrupt(object): def __init__(self, descriptor): self._read, self._write = os.pipe() self._interrupted = False self._descriptor = descriptor def interrupt(self): self._interrupted = True self._notify() def wait_for_receive(self): if self._interrupted: raise RuntimeError("Cannot be reused") read, _w, errors = select.select([self._read, self._descriptor], [], [self._descriptor]) if self._descriptor not in read: raise InterruptException def _notify(self): os.write(self._write, "I".encode())
Now it wraps around more recv
, implements a version of Windows or takes care of socket timeouts, it becomes very simple.