After going through a basic example for celery cones (works fine, as far as I can tell), I'm trying to integrate this into my own project. I mainly use this below:
from flask import Blueprint, jsonify, request, session from flask.views import MethodView from celery.decorators import task blueprint = Blueprint('myapi', __name__) class MyAPI(MethodView): def get(self, tag): return get_resource.apply_async(tag) @task(name="get_task") def get_resource(tag): pass
with the same setting as in the example, I get this error:
Traceback (most recent call last): File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1518, in __call__ return self.wsgi_app(environ, start_response) File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1506, in wsgi_app response = self.make_response(self.handle_exception(e)) File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1504, in wsgi_app response = self.full_dispatch_request() File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1264, in full_dispatch_request rv = self.handle_user_exception(e) File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1262, in full_dispatch_request rv = self.dispatch_request() File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1248, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 84, in view return self.dispatch_request(*args, **kwargs) File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 151, in dispatch_request return meth(*args, **kwargs) File "/x/api/modules/document/document.py", line 14, in get return get_resource.apply_async(tag) File "/x/venv/lib/python2.7/site-packages/celery/app/task/__init__.py", line 449, in apply_async publish = publisher or self.app.amqp.publisher_pool.acquire(block=True) File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 657, in acquire R = self.prepare(R) File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare p = p() File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda> return lambda: self.create_producer() File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 265, in create_producer pub = self.app.amqp.TaskPublisher(conn, auto_declare=False) File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 328, in TaskPublisher return TaskPublisher(*args, **self.app.merge(defaults, kwargs)) File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 158, in __init__ super(TaskPublisher, self).__init__(*args, **kwargs) File "/x/venv/lib/python2.7/site-packages/kombu/compat.py", line 61, in __init__ super(Publisher, self).__init__(connection, self.exchange, **kwargs) File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 79, in __init__ self.revive(self.channel) File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 168, in revive channel = channel.default_channel File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 581, in default_channel self.connection File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 574, in connection self._connection = self._establish_connection() File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 533, in _establish_connection conn = self.transport.establish_connection() File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 279, in establish_connection connect_timeout=conninfo.connect_timeout) File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 89, in __init__ super(Connection, self).__init__(*args, **kwargs) File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__ self.transport = create_transport(host, connect_timeout, ssl) File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport return TCPTransport(host, connect_timeout) File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__ raise socket.error, msg error: [Errno 111] Connection refused
I use redis, and if I install rabbitmq, I get another error, but I do not understand this right now - the broker must be redis, but it does not find it or what? Can someone let me more understand what is going on here? Do I need to import something else etc. The fact is that there is very little behind the bare bone, and for me this does not make sense.
The most that I was able to determine how, in the Api module, it does not have access to the "celery", and when it goes to try to put the data there, when at the application level, the celery falls into some default values ββthat are not set, because I specify redis. Just a hunch. I could not import the information into the module, I only determined that calling something "celery" (for example, output celery.conf) from the application was causing an error - although I could import celery.task.
This is the broker configuration used by the application, right from the example:
BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = "redis" CELERY_REDIS_HOST = "localhost" CELERY_REDIS_PORT = 6379 CELERY_REDIS_DB = 0
EDIT:
If you want to see the demo: https://github.com/thrisp/flask-celery-example
As it turns out, the presence of BROKER_TRANSPORT = 'redis' in your settings is important for what is being transmitted, which I pass (for the setting that I set out here and in the git example), I'm not completely sure why this is not in the bits of the example, but in the ones I added, but this is - without it, he wants to drop everything into the default ampq queue.
EDIT2:
In addition, this is a pretty big deal, using the upcoming version of Celery, simplifies 10,000 problems when used with Flask, making it all unnecessary.