I have a problem with every insert request (small request) that runs in celery tasks asynchronously. In sync mode, when I insert everything done perfectly, but when it runs in apply_async (), I get the following:
OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',)
Traceback:
Traceback (most recent call last): File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__ return self.run(*args, **kwargs) File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notifications send_new_comment_reply_notifications_method(comment_id) File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notifications comment_type='comment_reply' File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in add CommentsFeed(**kwargs).save() File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in save consistency=self.__consistency__).save() File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in save self._execute(insert) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _execute tmp = execute(q, consistency_level=self._consistency) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in execute result = session.execute(query, params) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in execute result = future.result(timeout) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in result raise OperationTimedOut(errors=self._errors, last_host=self._current_host) OperationTimedOut: errors={}, last_host=***.***.*.***
Does anyone have any ideas about the problem?
I found this. When cassandra-driver executed the request, cassandra-driver returned an OperationTimedOut error , but my request is very small and the problem is only in celery tasks.
UPDATE:
I completed a test task and it also causes this error.
@celery.task() def test_task_with_cassandra(): from app import cassandra_session cassandra_session.execute('use news_feed') return 'Done'
UPDATE 2: Made this:
@celery.task() def test_task_with_cassandra(): from cqlengine import connection connection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'], default_keyspace='test_keyspace') from .models import Feed Feed.objects.count() return 'Done'
Gets the following:
NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)})
From the shell I can connect to it
UPDATE 3 : From a deleted topic on the github problem (found this in my letters): (this worked for me as well) Here's how, in essence, I connect CQLengine to celery:
from celery import Celery from celery.signals import worker_process_init, beat_init from cqlengine import connection from cqlengine.connection import ( cluster as cql_cluster, session as cql_session) def cassandra_init(): """ Initialize a clean Cassandra connection. """ if cql_cluster is not None: cql_cluster.shutdown() if cql_session is not None: cql_session.shutdown() connection.setup()
This is rude, but it works. Should I add this snippet to the FAQ?