Thread safety in django with asynchronous tasks and redis

I have a django application that calls an asynchronous task in a set of requests (using celery). A task takes a set of queries and performs a whole bunch of operations that could potentially take up a lot of time based on the objects in it. Objects can be divided between sets of requests, so the user can send a task to a request containing objects that are already running, and that a new task should be performed only on objects that are not already running, but wait for all objects to complete before they return.

My explanations are a bit confusing, so imagine the following code:

from time import sleep import redis from celery.task import Task from someapp.models import InterestingModel from someapp.longtime import i_take_a_while class LongRunningTask(Task): def run(self, process_id, *args, **kwargs): _queryset = InterestingModel.objects.filter(process__id=process_id) r = redis.Redis() p = r.pipeline() run_check_sets = ('run_check', 'objects_already_running') # There must be a better way to do this: for o in _queryset.values_list('pk', flat=True): p.sadd('run_check') p.sdiff(run_check_sets) # Objects that need to be run p.sunion(run_check_sets) # Objects that we need to wait for p.sunionstore('objects_already_running',run_check_sets) p.delete('run_check') redis_result = p.execute() objects_to_run = redis_result[-3] objects_to_wait_for = redis_result[-2] if objects_to_run: i_take_a_while(objects_to_run) p = r.pipeline() for o in objects_to_run: p.srem('objects_already_running', o) p.execute() while objects_to_wait_for: p = r.pipeline() for o in objects_to_wait_for: p.sismember('objects_already_running',o) redis_result = p.execute() objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member] # Probably need to add some sort of timeout here or in redis sleep(30) 

I am extremely new to Redis, so my main question is is there a better way to manipulate Redis to achieve the same result. More broadly, I wonder if Redis / needs the right approach to solve this problem. There seems to be a better way for Django models to interact with Redis. Finally, I wonder if this code is really thread safe. Can anyone punch holes in my logic?

Any comments are welcome.

+4
source share
1 answer

Is it possible for you to change this a bit? In particular, I would start to perform tasks for each object, and then somewhere I stored information about your long work tasks (for example, database, cache, etc.). When each individual object was completed, it would update detailed information about the job and check if all tasks returned. If so, then you can run any code that should be run when the working task is completed.

This has the advantage that do not bind the thread on your server while you wait for other events. On the client side, you can periodically check the status of a long-term job and even use the number of objects to update the progress bar if you want.

+2
source

Source: https://habr.com/ru/post/1346464/


All Articles