Python cassandra-driver OperationTimeOut for each request in Celery task

I have a problem with every insert request (small request) that runs in celery tasks asynchronously. In sync mode, when I insert everything done perfectly, but when it runs in apply_async (), I get the following:

OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',) 

Traceback:

 Traceback (most recent call last): File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__ return self.run(*args, **kwargs) File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notifications send_new_comment_reply_notifications_method(comment_id) File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notifications comment_type='comment_reply' File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in add CommentsFeed(**kwargs).save() File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in save consistency=self.__consistency__).save() File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in save self._execute(insert) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _execute tmp = execute(q, consistency_level=self._consistency) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in execute result = session.execute(query, params) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in execute result = future.result(timeout) File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in result raise OperationTimedOut(errors=self._errors, last_host=self._current_host) OperationTimedOut: errors={}, last_host=***.***.*.*** 

Does anyone have any ideas about the problem?

I found this. When cassandra-driver executed the request, cassandra-driver returned an OperationTimedOut error , but my request is very small and the problem is only in celery tasks.

UPDATE:

I completed a test task and it also causes this error.

 @celery.task() def test_task_with_cassandra(): from app import cassandra_session cassandra_session.execute('use news_feed') return 'Done' 

UPDATE 2: Made this:

 @celery.task() def test_task_with_cassandra(): from cqlengine import connection connection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'], default_keyspace='test_keyspace') from .models import Feed Feed.objects.count() return 'Done' 

Gets the following:

 NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)}) 

From the shell I can connect to it

UPDATE 3 : From a deleted topic on the github problem (found this in my letters): (this worked for me as well) Here's how, in essence, I connect CQLengine to celery:

 from celery import Celery from celery.signals import worker_process_init, beat_init from cqlengine import connection from cqlengine.connection import ( cluster as cql_cluster, session as cql_session) def cassandra_init(): """ Initialize a clean Cassandra connection. """ if cql_cluster is not None: cql_cluster.shutdown() if cql_session is not None: cql_session.shutdown() connection.setup() # Initialize worker context for both standard and periodic tasks. worker_process_init.connect(cassandra_init) beat_init.connect(cassandra_init) app = Celery() 

This is rude, but it works. Should I add this snippet to the FAQ?

+6
source share
3 answers

I had a similar problem. It seemed that this was due to the fact that he shared Cassandra's session between tasks. I solved this by creating a session on a thread. Make sure you call get_session() from your tasks, and then do the following:

 thread_local = threading.local() def get_session(): if hasattr(thread_local, "cassandra_session"): return thread_local.cassandra_session cluster = Cluster(settings.CASSANDRA_HOSTS) session = cluster.connect(settings.CASSANDRA_KEYSPACE) thread_local.cassandra_session = session return session 
+5
source

Other answers did not help me, but the question "update 3" did. Here is what I got (small updates to the sentence in the question):

 from celery.signals import worker_process_init from cassandra.cqlengine import connection from cassandra.cqlengine.connection import ( cluster as cql_cluster, session as cql_session) def cassandra_init(*args, **kwargs): """ Initialize a clean Cassandra connection. """ if cql_cluster is not None: cql_cluster.shutdown() if cql_session is not None: cql_session.shutdown() connection.setup(settings.DATABASES["cassandra"]["HOST"].split(','), settings.DATABASES["cassandra"]["NAME"]) # Initialize worker context (only standard tasks) worker_process_init.connect(cassandra_init) 
+2
source

Inspired by Ron, I came up with the following code to put tasks.py:

 import threading from django.conf import settings from cassandra.cluster import Cluster from celery.signals import worker_process_init,worker_process_shutdown thread_local = threading.local() @worker_process_init.connect def open_cassandra_session(*args, **kwargs): cluster = Cluster([settings.DATABASES["cassandra"]["HOST"],], protocol_version=3) session = cluster.connect(settings.DATABASES["cassandra"]["NAME"]) thread_local.cassandra_session = session @worker_process_shutdown.connect def close_cassandra_session(*args,**kwargs): session = thread_local.cassandra_session session.shutdown() thread_local.cassandra_session = None 

This neat solution will automatically open / close cassandra sessions when the celery workflow starts and stops.

Side note: protocol_version = 3, because Cassandra 2.1 only supports protocol versions 3 and below.

+1
source

Source: https://habr.com/ru/post/972333/


All Articles