Here's how you can do it with Mongo.
NOTE. I had to make the design a little more forgiving, since Celery's tasks do not guarantee the exact eta moment, or countdown ends.
In addition, Mongo index indices are only cleaned every minute or so; Thus, you cannot base a design around deleted records at the moment eta gets up.
Anyway, the stream looks something like this:
- Client code calls
my_task . preflight increments the call counter and returns it as flight_id_my_task set after TTL seconds.- When
_my_task launched, it checks to see if it continues with flight_id . If it is not, it is interrupted. - ... after some time ... mongo cleans up obsolete entries in the collection, through an expiring index.
@celery.task(track_started=False, ignore_result=True) def my_task(my_arg): flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL) _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL) @celery.task(track_started=False, ignore_result=True) def _my_task(my_arg, flight_id=None): if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id): return
Library Code:
TTL = 5 * 60 # Run tasks after 5 minutes EXPIRY = 6 * TTL # This needs to be much larger than TTL. # We need to store a list of task-executions currently pending inflight_collection = db['celery_In_Flight'] inflight_collection.create_index([('fn', pymongo.ASCENDING,), ('key', pymongo.ASCENDING,)]) inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY) def preflight(collection, fn, key, ttl): eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl) result = collection.find_one_and_update({ 'fn': fn, 'key': key, }, { '$set': { 'eta': eta }, '$inc': { 'flightId': 1 } }, upsert=True, return_document=pymongo.ReturnDocument.AFTER) print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId']) return result['flightId'] def check_for_takeoff(collection, fn, key, flight_id): result = collection.find_one({ 'fn': fn, 'key': key }) ready = result is None or result['flightId'] == flight_id print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready) return ready