Calling Tornado coroutines from synchronous code

This will be a long question, therefore:

TL DR: I have a Python 2.7 threaded network server with a request handler, the call stack looks like this:

WorkerThread -> requestHandler -> func1 -> func2 -> .. -> func10 -> doStuff -> BlockingIO 

I want to use Tornado 3.0 IOLoop and change only parts of the server and IO:

 (IOLoop) -> requestHandler -> func1 -> func2 -> .. -> func10 -> (doStuff) -> (AsyncIO) 

Thus, the entire code stack between requestHandler () and func10 () will not change at all. In fact, even the doStuff () interface will not change, and it will be blocked. However, internally, it will use the AsyncIO object (which is a Tornado coroutine), and during the asynchronous I / O operation, IOLoop to execute other coroutines until the I / O operation completes.

Is it possible?




Now for a practical example:

I have a network server that receives requests and processes them using a thread pool (or a process pool, it doesn't matter how much this example goes):

 def main(): # Main entry point, called below. # Fake class, you can imagine the internals. We register a request # handler here - handleRequest() server = ThreadedServer(handler=handleRequest) # Server has a thread pool, each request is handled on a worker thread. # One thread handles network stuff and pushes requests to worker threads Server.start() def handleRequest(server_address): # This is the request handler, called in the context of a worker # thread, after a network request was received. # We call the function below. It blocks the thread until it finishes. # Not very optimal, since the blocking is network IO bound result = doStuff(server_address) # We use the result somehow, here we print it print "Request handled with result: %s" % result def doStuff(server_address): # This is called by the request handler # This is a network bound object, most of its time is spent waiting # for the network IO net_bound_object = NetBoundSyncObject(server_address) # This would block, waiting on the network, preventing the thread from # handling other requests result = net_bound_object.do_something() # We have the result, return it return result if __name__ == "__main__": main() 

Pretty simple, really.

Now, let's say, I decided that I wanted to reorganize my server to use Tornado, using tornado.gen to support asynchronous operations, so it didn't interfere so much with network IO. So this is my new code:

 def main(): # Start Tornado IOLoop, first entering TornadoServer.start() to begin # initializing the server and accept requests. # server.start is a coroutine that waits for network IO, yielding # control back to the IOLoop until something # happens. When something does, it is awakened and schedules a # request handler - handleRequest, and goes back to network IO, # yielding control. Thus, handleRequest is called. server = TornadoServer(handler=handleRequest) # fake class again IOLoop.instance().add_callback(server.start) IOLoop.instance().start() def handleRequest(server_address): # This part of the code has not been changed - just the comments. # It is now run in the context of an IOLoop callback. # We call the function above. The interface remains the same. It also seems # to block - which is fine, we want to wait for its result to continue processing. # However, we want the IOLoop to continue running somehow. result = doStuff(server_address) # We use the result somehow, here we print it print "Request handled with result: %s" % result def doStuff(server_address): # This is a network bound object, most of its time is spent waiting for # the network IO, however all its methods are coroutines and it yields # while waiting for network IO net_bound_object = NetBoundAsyncObject(server_address) # Now to the problem. # doStuff() is a facade - I don't want it to be a coroutine, I want it to hide # the implementation details and keep its previous interface. # However, NetBoundAsyncObject.do_something_async() is a coroutine, and calls # coroutines inside it. So it should be called in the context of # another coroutine: result = yield net_bound_object.do_something_async() # but this is wrong here, since we are not a coroutine. # To properly call it asynchronously, I would need to make doStuff() # a coroutine as well, breaking its interface, which would mean that # handleRequest too should now be a coroutine. Not a big change, but imagine # that instead of calling doStuff() directly, I had code like: # handleRequest -> func1 -> func2 -> func3 -> ... -> func10 -> doStuff # so now I'd have to change all these functions to be coroutines as well. # All of these functions, handleRequest and func1..10, represent a big stack # of code in my real system which is completely synchronous, CPU bound code, # so it has no IO waits anywhere, just code that needs to be run BEFORE and # AFTER the network IO bound code finishes, to properly handle the request. # It is well tested, production proven code that requires no functional change, # and that doesn't need to be a coroutine. This would be a big refactor. # In the code as it is now, result is now returned as a Future: result = net_bound_object.do_something_async() # I want to be able to do something like: IOLoop.instance().wait_for_future(result) # Letting the IOLoop run and handle other things in the meanwhile, like # network requests, and also my asynchronous code. # When it finishes, I want my wait_for_future() to return and to continue # execution with the result accessible in the future object. # Thus, the changes would be at the top (the TornadoServer vs ThreadedServer) # and the bottom (doStuff to use either NetBoundObject or NetBoundAsyncObject), # but the middle stack will remain unchanged. # Return the result of the operation return result if __name__ == "__main__": main() 

I know this is a problem in many ways, mainly due to the call stack. When we do something like:

 IOLoop.instance().wait_for_future(result) 

we have a call stack that looks like this:

 IOLoop.main_loop.start() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> other_callbacks.. 

so that we can (or even possibly) encounter situations such as:

 IOLoop.main_loop.start() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> ... 

obviously, if handleRequest itself becomes a coroutine, then when it issues it, we don’t have such deep stack problems.

In the embedded system that I once used using an unmanaged scheduler, there was no problem to return control to the scheduler without stack problems. The scheduler will take the execution context and the call stack and save them, then move to another context / stack and continue execution from there. When waiting for events / I / O, the scheduler starts and starts everything that was in the I / O cycle. I want something like this on my system, instead of changing the entire call stack above - converting TOTAL to coroutines.

Does anyone have any tips, any ideas?

+4
source share
1 answer

You can run the @ gen.coroutine decoding function synchronously using:

 @gen.coroutine def main(): # do stuff... if __name__ == '__main__': IOLoop.instance().run_sync(main) 

This starts "IOLoop", starts the function and stops the loop. https://github.com/facebook/tornado/blob/master/tornado/ioloop.py

+4
source

Source: https://habr.com/ru/post/1495592/


All Articles