How to add a timeout to twisted deferment

  from twisted.internet import reactor
 from twisted.internet import threads
 from twisted.internet import defer
 import time

 def worker (arg):
     print 'Hello world'
      time.sleep (10)
     return 1

 def run ():
     print 'Starting workers'
     l = []
     for x in range (2):
         l.append (threads.deferToThread (worker, x))
     return defer.DeferredList (l)

 def res (results):
     print results
     reactor.stop ()

 d = run ()
 d.addCallback (res)
 reactor.run ()

How to stop workers with a timeout?

+6
source share
4 answers

Threads cannot be interrupted if they do not cooperate with you. time.sleep(10) not going to cooperate, so I don’t think you can interrupt this worker. If you have another kind of employee who has several discrete phases or is working on some tasks in a loop, you can do something like this:

 def worker(stop, jobs): for j in jobs: if stop: break j.do() stop = [] d = deferToThread(worker) # This will make the list eval to true and break out of the loop. stop.append(None) 

This is also not a hang. This is how threads work in Python.

+5
source

While it is not possible to interrupt threads, Deferred can be stopped using the cancel function, which I think is available in Twisted 10.1.0 and later.

I used the following class to do Deferrals that refer to a specific function if Deferment has not quit after a while. This may be useful for someone who has the same question as the OP subject.

EDIT: As suggested below, it is best not to inherit from defer.Deferred . So I changed the code to use a wrapper that achieves the same effect.

 class DeferredWrapperWithTimeout(object): ''' Holds a deferred that allows a specified function to be called-back if the deferred does not fire before some specified timeout. ''' def __init__(self, canceller=None): self._def = defer.Deferred(canceller) def _finish(self, r, t): ''' Function to be called (internally) after the Deferred has fired, in order to cancel the timeout. ''' if ( (t!=None) and (t.active()) ): t.cancel() return r def getDeferred(self): return self._def def addTimeoutCallback(self, reactr, timeout, callUponTimeout, *args, **kw): ''' The function 'callUponTimeout' (with optional args or keywords) will be called after 'timeout' seconds, unless the Deferred fires. ''' def timeoutCallback(): self._def.cancel() callUponTimeout(*args, **kw) toc = reactr.callLater(timeout, timeoutCallback) return self._def.addCallback(self._finish, toc) 

Example callback before timeout:

 from twisted.internet import reactor from DeferredWithTimeout import * dw = DeferredWrapperWithTimeout() d = dw.getDeferred() def testCallback(x=None): print "called" def testTimeout(x=None): print "timedout" d.addCallback(testCallback) dw.addTimeoutCallback(reactor, 20, testTimeout, "to") reactor.callLater(2, d.callback, "cb") reactor.run() 

Prints "called" and nothing more.

Example timeout before callback:

 from twisted.internet import reactor from DeferredWithTimeout import * dw = DeferredWrapperWithTimeout() d = dw.getDeferred() def testCallback(x=None): print "called" def testTimeout(x=None): print "timedout" d.addCallback(testCallback) dw.addTimeoutCallback(reactor, 20, testTimeout, "to") reactor.run() 

Prints a "timeout" after 20 seconds and nothing more.

+3
source

Well, my answer is not about threads, but as said, you can implement the timeout function as a separate helper:

 from twisted.internet import defer def add_watchdog(deferred, timeout=0.05): def callback(value): if not watchdog.called: watchdog.cancel() return value deferred.addBoth(callback) from twisted.internet import reactor watchdog = reactor.callLater(timeout, defer.timeout, deferred) d = defer.Deferred() add_watchdog(d) 

You can then catch defer.TimeoutError in deferred errback if you need to.

0
source

We do it this way using a decorator. The advantage of this method is that the delay is canceled when the wait time is reached. This should somehow become part of the Twisted imho library.

 from twisted.internet import defer, reactor def timeout(secs): """Decorator to add timeout to Deferred calls""" def wrap(func): @defer.inlineCallbacks def _timeout(*args, **kwargs): raw_d = func(*args, **kwargs) if not isinstance(raw_d, defer.Deferred): defer.returnValue(raw_d) timeout_d = defer.Deferred() times_up = reactor.callLater(secs, timeout_d.callback, None) try: raw_result, timeout_result = yield defer.DeferredList( [raw_d, timeout_d], fireOnOneCallback=True, fireOnOneErrback=True, consumeErrors=True) except defer.FirstError as e: # Only raw_d should raise an exception assert e.index == 0 times_up.cancel() e.subFailure.raiseException() else: # timeout if timeout_d.called: raw_d.cancel() raise Exception("%s secs have expired" % secs) # no timeout times_up.cancel() defer.returnValue(raw_result) return _timeout return wrap 
0
source

Source: https://habr.com/ru/post/886130/


All Articles