I am trying to simulate a network of applications that work using twisted ones. As part of my simulation, I would like to synchronize certain events and be able to feed each process with large amounts of data. I decided to use multiprocessor events and queues. However, my processes hanged themselves.
I wrote the sample code below to illustrate the problem. In particular (about 95% of the time on my sand bridge machine), the run_in_thread function completes, however the print_done callback is not called until I press Ctrl-C.
In addition, I can change several things in the sample code to make this work more reliable, such as: reducing the number of spawned processes, calling self.ready.set from react_ready, or changing the deferLater delay.
I assume that between the twisted reactor and the blocking of multiprocessor calls is there a race condition like Queue.get () or Event.wait ()?
What exactly is the problem I am facing? Is there an error in my code that I am missing? Can I fix this or is it twisted incompatible with multiprocessing events / queues?
Secondly, would something like spawnProcess or Ampoule be the recommended alternative? (as suggested in Mix Python Twisted with multiprocessing? )
Editing (on request):
I ran into problems with all the reactors I tried, glake2reactor selectreactor, pollreactor and epollreactor. It seems that epollreactor gives the best results and seems to work fine for the example below, but still gives me the same (or similar) problem in my application. I will continue the investigation.
I am running Gentoo Linux kernel 3.3 and 3.4, python 2.7, and I have tried Twisted 10.2.0, 11.0.0, 11.1.0, 12.0.0 and 12.1.0.
In addition to my sand bridge machine, I see the same problem on my amd dual core machine.
#!/usr/bin/python # -*- coding: utf-8 *-* from twisted.internet import reactor from twisted.internet import threads from twisted.internet import task from multiprocessing import Process from multiprocessing import Event class TestA(Process): def __init__(self): super(TestA, self).__init__() self.ready = Event() self.ready.clear() self.start() def run(self): reactor.callWhenRunning(self.reactor_ready) reactor.run() def reactor_ready(self, *args): task.deferLater(reactor, 1, self.node_ready) return args def node_ready(self, *args): print 'node_ready' self.ready.set() return args def reactor_running(): print 'reactor_running' df = threads.deferToThread(run_in_thread) df.addCallback(print_done) def run_in_thread(): print 'run_in_thread' for n in processes: n.ready.wait() def print_done(dfResult=None): print 'print_done' reactor.stop() if __name__ == '__main__': processes = [TestA() for i in range(8)] reactor.callWhenRunning(reactor_running) reactor.run()