SQLAlchemy Session Questions with Celery

I have scheduled several recurring tasks with a celery build for our web application.

The application itself is created using the pyramid web framework. Using the zopetransaction extension for session management

In celery, I use the application as a library. I am redefining a session in models with a function.

It works well, but from time to time it raises InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction

I’m not sure what is wrong and why he gives these warnings.

Code example:

in tasks.py

 def initialize_async_session(): import sqlalchemy from webapp.models import Base, set_dbsession, engine Session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True) ) Session.configure(bind=engine) session = Session() set_dbsession(session) Base.metadata.bind = engine return session @celery.task def rerun_scheduler(): log.info("Starting pipeline scheduler") session = initialize_async_session() webapp.sheduledtask.service.check_for_updates(session) log.info("Ending pipeline scheduler") 

In models.py in webapp

 DBSession = scoped_session(sessionmaker(bind=engine, expire_on_commit=False, extension=ZopeTransactionExtension())) def set_dbsession(db_session=None): """ This function sets the db session """ global DBSession if db_session: DBSession = db_session log.info("session changed to {0}".format(db_session)) 

UPDATE:

tracking:

 Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner self.run() File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function result = f(*args, **kwargs) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function result = f(*args, **kwargs) File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run self.table_params.set_task_status_as_finished() File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished task = Task.get_by_id(self.task_id) File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/models.py", line 162, in get_by_id return DBSession.query(cls).filter(cls.id == obj_id).first() File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2156, in first ret = list(self[0:1]) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2023, in __getitem__ return list(res) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2227, in __iter__ return self._execute_and_instances(context) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2240, in _execute_and_instances close_with_result=True) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2231, in _connection_from_session **kw) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 777, in connection close_with_result=close_with_result) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 781, in _connection_for_bind return self.transaction._connection_for_bind(engine) File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 289, in _connection_for_bind self._assert_is_active() File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 217, in _assert_is_active "This Session transaction has been rolled back " InvalidRequestError: This Session transaction has been rolled back by a nested rollback() call. To begin a new transaction, issue Session.rollback() first. ######################################################################### [2013-05-30 14:32:57,782: WARNING/PoolWorker-3] Exception in thread Thread-4: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function result = f(*args, **kwargs) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function result = f(*args, **kwargs) File "/home/ranjith/wksp/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run self.table_params.set_task_status_as_finished() File "/home/ranjith/wksp/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished task = Task.get_by_id(self.task_id) File "/home/ranjith/wksp/mvc-service/webapp/webapp/models.py", line 166, in get_by_id return DBSession.query(cls).filter(cls.id == obj_id).first() File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2145, in first ret = list(self[0:1]) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2012, in __getitem__ return list(res) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2216, in __iter__ return self._execute_and_instances(context) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2229, in _execute_and_instances close_with_result=True) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2220, in _connection_from_session **kw) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 798, in connection close_with_result=close_with_result) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 802, in _connection_for_bind return self.transaction._connection_for_bind(engine) File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 281, in _connection_for_bind self._assert_active() File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 181, in _assert_active "This session is in 'prepared' state; no further " InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction. 
+6
source share
1 answer

I believe the problem is that you are trying to use a SQLAlchemy session in your celery task.

The first thing I recommend doing is to create two separate sessions with a binding, one for your Celery application and another for your web application. Then I would make sure that your Celery database session was set up only once during Celery initialization. You can use Celery worker_init.connect to make sure it creates a database at the time Celery starts ( http://hynek.me/articles/using-celery-with-pyramid/ ).

It is very important that your web application does not use the same database session as your Celery application.

Something like this for your tasks.py file:

 from celery import Celery from celery.signals import worker_init from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker Session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True)) @worker_init.connect def initialize_session(): some_engine = create_engine('database_url') Session.configure(bind=some_engine) @celery.task def rerun_scheduler(): log.info("Starting pipeline scheduler") webapp.sheduledtask.service.check_for_updates(Session) log.info("Ending pipeline scheduler") 
+10
source

Source: https://habr.com/ru/post/946008/


All Articles