I wrote a module that dynamically adds periodic celery tasks based on a list of dictionaries in project settings (imported via django.conf.settings ). I do this with the add_tasks function, which assigns a function to be called with a specific uuid , which is specified in the settings:
def add_tasks(celery): for new_task in settings.NEW_TASKS: celery.add_periodic_task( new_task['interval'], my_task.s(new_task['uuid']), name='My Task %s' % new_task['uuid'], )
Like the one suggested here , I use the on_after_configure.connect signal to call the function in my celery.py :
app = Celery('my_app') @app.on_after_configure.connect def setup_periodic_tasks(celery, **kwargs): from add_tasks_module import add_tasks add_tasks(celery)
This setup works fine for celery beat and celery worker , but breaks my setup where I use uwsgi to serve my django application. uwsgi runs smoothly until the first time the view code submits a task using the celery .delay() method. At this point, it seems that the celery is initialized in uwsgi , but is permanently blocked in the above code. If I start it manually from the command line and then interrupt when it blocks, I get the following (shortened) stack trace:
Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'tasks' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'data' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'tasks' During handling of the above exception, another exception occurred: Traceback (most recent call last): (SHORTENED HERE. Just contained the trace from the console through my call to this function) File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks my_task.s(new_task['uuid']), File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__ return getattr(self._get_current_object(), name) File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object return loc(*self.__args, **self.__kwargs) File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons return app.tasks[ File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__ value = obj.__dict__[self.__name__] = self.__get(obj) File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks self.finalize(auto=True) File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize with self._finalize_mutex:
There seems to be a problem with acquiring a mutex.
I am currently using a workaround to detect if sys.argv[0] contains uwsgi and then does not add periodic tasks, since only beat needs tasks, but I would like to understand whatβs wrong here, the problem is forever.
Could this problem have something to do with using multi-threaded or multi-processor uwsgi processing, where one thread / process contains a mutex that another requires?
I would be grateful for any tips that will help me solve the problem. Thanks.
I use: Django 1.11.7 and Celery 4.1.0
Change 1
I created a minimal setup for this problem:
celery.py:
import os from celery import Celery from django.conf import settings from myapp.tasks import my_task
tasks.py:
from celery import shared_task @shared_task() def my_task(): print('ran')
Make sure CELERY_TASK_ALWAYS_EAGER = False and you have a working message queue.
Run:
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
Wait about 10 seconds before interruption to see the above error.