Is twisting incompatible with multiprocessing events and queues?

I am trying to simulate a network of applications that work using twisted ones. As part of my simulation, I would like to synchronize certain events and be able to feed each process with large amounts of data. I decided to use multiprocessor events and queues. However, my processes hanged themselves.

I wrote the sample code below to illustrate the problem. In particular (about 95% of the time on my sand bridge machine), the run_in_thread function completes, however the print_done callback is not called until I press Ctrl-C.

In addition, I can change several things in the sample code to make this work more reliable, such as: reducing the number of spawned processes, calling self.ready.set from react_ready, or changing the deferLater delay.

I assume that between the twisted reactor and the blocking of multiprocessor calls is there a race condition like Queue.get () or Event.wait ()?

What exactly is the problem I am facing? Is there an error in my code that I am missing? Can I fix this or is it twisted incompatible with multiprocessing events / queues?

Secondly, would something like spawnProcess or Ampoule be the recommended alternative? (as suggested in Mix Python Twisted with multiprocessing? )

Editing (on request):

I ran into problems with all the reactors I tried, glake2reactor selectreactor, pollreactor and epollreactor. It seems that epollreactor gives the best results and seems to work fine for the example below, but still gives me the same (or similar) problem in my application. I will continue the investigation.

I am running Gentoo Linux kernel 3.3 and 3.4, python 2.7, and I have tried Twisted 10.2.0, 11.0.0, 11.1.0, 12.0.0 and 12.1.0.

In addition to my sand bridge machine, I see the same problem on my amd dual core machine.

#!/usr/bin/python # -*- coding: utf-8 *-* from twisted.internet import reactor from twisted.internet import threads from twisted.internet import task from multiprocessing import Process from multiprocessing import Event class TestA(Process): def __init__(self): super(TestA, self).__init__() self.ready = Event() self.ready.clear() self.start() def run(self): reactor.callWhenRunning(self.reactor_ready) reactor.run() def reactor_ready(self, *args): task.deferLater(reactor, 1, self.node_ready) return args def node_ready(self, *args): print 'node_ready' self.ready.set() return args def reactor_running(): print 'reactor_running' df = threads.deferToThread(run_in_thread) df.addCallback(print_done) def run_in_thread(): print 'run_in_thread' for n in processes: n.ready.wait() def print_done(dfResult=None): print 'print_done' reactor.stop() if __name__ == '__main__': processes = [TestA() for i in range(8)] reactor.callWhenRunning(reactor_running) reactor.run() 
+6
source share
1 answer

Short answer: yes, twisted and multiprocessing are incompatible with each other, and you cannot reliably use them when trying.

On all POSIX platforms, child process management is closely related to SIGCHLD processing. POSIX signal handlers are global processes, and there can only be one type of signal.

Twisted and stdlib multiprocessing cannot have a SIGCHLD handler. Only one of them can. This means that only one of them can reliably manage child processes. Your application example does not control which of them will win this ability, so I expect that some kind of determinism will arise in its behavior associated with this fact.

However, a more immediate problem with your example is that you load Twisted into the parent process, and then use multiprocessing for the fork and not execute all the child processes. Twisted does not support this use. If you use fork and then exec, no problem. However, the lack of exec of the new process (possibly a Python process using Twisted) leads to all kinds of additional general state that Twisted does not take into account. In your particular case, the shared state that causes this problem is the internal "waker fd", which is used to implement deferToThread . When fd is split between the parent and all children, when the parent tries to wake up the main thread to deliver the result of the deferToThread call, it most likely wakes up one of the child processes. The baby process has nothing useful, so it's just a waste of time. Meanwhile, the main thread in the parental home never wakes up and never notices that your threaded task has been completed.

Perhaps you can avoid this problem by not loading any of the Twisted until you have already created the child processes. This would turn your use into a single-processor use case in relation to Twisted (in each process it would be initially loaded, and then this process would not develop at all, so there are no questions about how fork and Twisted interact more). This means that you do not even import Twisted until you create child processes.

Of course, this will only help you get to Twisted. Any other libraries you use may run into similar problems (you mentioned glib2, which is a great example of another library that will completely strangle if you try to use it like that).

I highly recommend not using the multiprocessing module at all. Instead, use any multiprocessor approach involving fork and exec, not fork. An ampoule falls into this category.

+10
source

Source: https://habr.com/ru/post/919378/


All Articles