Periodically configure periodic tasks in celery (celerybeat) using add_periodic_task

I am using Celery 4.0.1 with Django 1.10 and I have problems scheduling tasks (task execution works fine). Here is the celery configuration:

 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') app = Celery('myapp') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST) app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery' app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default' app.conf.CELERY_TASK_SERIALIZER = 'json' app.conf.CELERY_ACCEPT_CONTENT = ['json'] app.conf.CELERY_IGNORE_RESULT = True app.conf.CELERY_DISABLE_RATE_LIMITS = True app.conf.BROKER_POOL_LIMIT = 2 app.conf.CELERY_QUEUES = ( Queue('myapp.celery_default'), Queue('myapp.queue1'), Queue('myapp.queue2'), Queue('myapp.queue3'), ) 

Then in tasks.py I have:

 @app.task(queue='myapp.queue1') def my_task(some_id): print("Doing something with", some_id) 

In views.py, I want to schedule this task:

 def my_view(request, id): app.add_periodic_task(10, my_task.s(id)) 

Then I execute the commands:

 sudo systemctl start rabbitmq.service celery -A myapp.celery_app beat -l debug celery worker -A myapp.celery_app 

But the task is never planned. I do not see anything in the magazines. The task works because, in my opinion, I:

 def my_view(request, id): my_task.delay(id) 

The task is in progress.

If in my configuration file, if I plan the task manually like this, it works:

 app.conf.CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.my_task', 'schedule': 10.0, 'args': (66,) }, } 

I just can't schedule the task dynamically. Any idea?

+9
source share
1 answer

EDIT: (01/13/2018)

In the latest release 4.1.0, this topic was addressed in this ticket No. 3958 and was combined


In fact, you cannot help but define a periodic task at the viewing level, because the beat schedule setting will be loaded first and cannot be transferred at run time:

The add_periodic_task() function will add an entry to the beat_schedule setting backstage, and this same setting can also be used to manually configure periodic tasks:

 app.conf.CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.my_task', 'schedule': 10.0, 'args': (66,) }, } 

this means that if you want to use add_periodic_task() it should be placed in the on_after_configure handler at the celery application level, and any changes at run time will not take effect:

 app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10, my_task.s(66)) 

As mentioned in the document, regular celery just keeps track of the task:

The default scheduler is celery.beat.PersistentScheduler , which simply keeps track of the last run time in the local shelf database file.

To be able to dynamically manage periodic tasks and re-plan the rhythm of celery at runtime:

There is also a django-celery-beat extension that stores the schedule in the Django database and provides a convenient admin interface for managing periodic tasks at runtime .

Tasks will be stored in the django database, and the scheduler can be updated in the task model at the database level. Each time you update a periodic task, the counter in this task table will increase and tell the celery rhythm service to reload the schedule from the database.

A possible solution for you may be as follows:

 from django_celery_beat.models import PeriodicTask, IntervalSchedule schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66])) 

views.py

 def update_task_view(request, id) task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique task.args=json.dumps([id]) task.save() 
+20
source

Source: https://habr.com/ru/post/1261202/


All Articles