Celery add_periodic_task blocks Django startup in uwsgi environment

I wrote a module that dynamically adds periodic celery tasks based on a list of dictionaries in project settings (imported via django.conf.settings ). I do this with the add_tasks function, which assigns a function to be called with a specific uuid , which is specified in the settings:

 def add_tasks(celery): for new_task in settings.NEW_TASKS: celery.add_periodic_task( new_task['interval'], my_task.s(new_task['uuid']), name='My Task %s' % new_task['uuid'], ) 

Like the one suggested here , I use the on_after_configure.connect signal to call the function in my celery.py :

 app = Celery('my_app') @app.on_after_configure.connect def setup_periodic_tasks(celery, **kwargs): from add_tasks_module import add_tasks add_tasks(celery) 

This setup works fine for celery beat and celery worker , but breaks my setup where I use uwsgi to serve my django application. uwsgi runs smoothly until the first time the view code submits a task using the celery .delay() method. At this point, it seems that the celery is initialized in uwsgi , but is permanently blocked in the above code. If I start it manually from the command line and then interrupt when it blocks, I get the following (shortened) stack trace:

 Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'tasks' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'data' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'tasks' During handling of the above exception, another exception occurred: Traceback (most recent call last): (SHORTENED HERE. Just contained the trace from the console through my call to this function) File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks my_task.s(new_task['uuid']), File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__ return getattr(self._get_current_object(), name) File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object return loc(*self.__args, **self.__kwargs) File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons return app.tasks[ File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__ value = obj.__dict__[self.__name__] = self.__get(obj) File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks self.finalize(auto=True) File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize with self._finalize_mutex: 

There seems to be a problem with acquiring a mutex.

I am currently using a workaround to detect if sys.argv[0] contains uwsgi and then does not add periodic tasks, since only beat needs tasks, but I would like to understand what’s wrong here, the problem is forever.

Could this problem have something to do with using multi-threaded or multi-processor uwsgi processing, where one thread / process contains a mutex that another requires?

I would be grateful for any tips that will help me solve the problem. Thanks.

I use: Django 1.11.7 and Celery 4.1.0

Change 1

I created a minimal setup for this problem:

celery.py:

 import os from celery import Celery from django.conf import settings from myapp.tasks import my_task # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings') app = Celery('my_app') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task( 60, my_task.s(), name='Testtask' ) app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

tasks.py:

 from celery import shared_task @shared_task() def my_task(): print('ran') 

Make sure CELERY_TASK_ALWAYS_EAGER = False and you have a working message queue.

Run:

 ./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()' 

Wait about 10 seconds before interruption to see the above error.

+5
source share
2 answers

So, I found that the @shared_task decorator @shared_task creating a problem. I can get around this problem when I declare the task right in the function called by the signal as follows:

 def add_tasks(celery): @celery.task def my_task(uuid): print(uuid) for new_task in settings.NEW_TASKS: celery.add_periodic_task( new_task['interval'], my_task.s(new_task['uuid']), name='My Task %s' % new_task['uuid'], ) 

This solution really works for me, but I have one more problem: I use this code in the plug-in application, so I cannot directly access the celery application outside the signal handler, but I would also like the ability to call the my_task function from another code. Having defined it inside a function, it is not available outside the function, so I cannot import it anywhere.

Perhaps I can get around this by defining the task function outside the signal function and use it with different decorators here and in tasks.py . I am wondering though if there is a decorator other than the @shared_task decorator, which I can use in tasks.py , which does not pose a problem.

The current best solution might be:

task_app .__ INIT __ en :.

 def my_task(uuid): # do stuff print(uuid) def add_tasks(celery): celery_my_task = celery.task(my_task) for new_task in settings.NEW_TASKS: celery.add_periodic_task( new_task['interval'], celery_my_task(new_task['uuid']), name='My Task %s' % new_task['uuid'], ) 

task_app.tasks.py:

 from celery import shared_task from task_app import my_task shared_my_task = shared_task(my_task) 

myapp.celery.py:

 import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings') app = Celery('my_app') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): from task_app import add_tasks add_tasks(sender) app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 
+1
source

Could you try this signal @app.on_after_finalize.connect :

some quick snippet from the working draft celery==4.1.0 , Django==2.0 , django-celery-beat==1.1.0 and django-celery-results==1.0.1

 @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): """ setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE """ for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items(): sender.add_periodic_task( task_config['schedule'], fetch_shopify.s(**task_config['kwargs']['resource_name']), name=task_name ) 

part of CELERY_BEAT_SCHEDULE :

 CELERY_BEAT_SCHEDULE = { 'fetch_shopify_orders': { 'task': 'shopify.tasks.fetch_shopify', 'schedule': crontab(hour="*/3", minute=0), 'kwargs': { 'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS } } } 
0
source

Source: https://habr.com/ru/post/1273316/


All Articles