SQLAlchemy Serializable Transaction Isolation and Retries on Idiomatic Python Path

PostgreSQL and SQL define Serializable transaction isolation level . If you isolate transactions at this level, conflicting concurrent transactions are aborted and need to be retried.

I am familiar with the concept of transaction attempts from the Plone / Zope world, where the entire HTTP request can be played in case of transaction conflict. How can similar functionality be achieved with SQLAlchemy (and possibly with zope.sqlalchemy )? I tried to read the documentation of zope.sqlalchemy and the Zope transaction manager , but this is not obvious.

Specifically, I want something like this:

# Try to do the stuff, if it fails because of transaction conflict do again until retry count is exceeded with transaction.manager(retries=3): do_stuff() # If we couldn't get the transaction through even after 3 attempts, fail with a horrible exception 
+5
source share
2 answers

So, after poking around two weeks and not getting a ready-made solution, I came up with my own.

Here is the ConflictResolver class that provides the decorator of the managed_transaction function. You can use the decorator to mark the functions to be retried. That is, if a database conflict error occurs when the function starts, the function starts again, now with high hopes that the db transaction that caused the conflict error would end.

Source code here: https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

Unit tests to fill it out: https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master

Python 3.4+ only.

 """Serialized SQL transaction conflict resolution as a function decorator.""" import warnings import logging from collections import Counter from sqlalchemy.orm.exc import ConcurrentModificationError from sqlalchemy.exc import OperationalError UNSUPPORTED_DATABASE = "Seems like we might know how to support serializable transactions for this database. We don't know or it is untested. Thus, the reliability of the service may suffer. See transaction documentation for the details." #: Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy DATABASE_COFLICT_ERRORS = [] try: import psycopg2.extensions except ImportError: pass else: DATABASE_COFLICT_ERRORS.append((psycopg2.extensions.TransactionRollbackError, None)) # ORA-08177: can't serialize access for this transaction try: import cx_Oracle except ImportError: pass else: DATABASE_COFLICT_ERRORS.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177)) if not DATABASE_COFLICT_ERRORS: # TODO: Do this when cryptoassets app engine is configured warnings.warn(UNSUPPORTED_DATABASE, UserWarning, stacklevel=2) #: XXX: We need to confirm is this the right way for MySQL, SQLIte? DATABASE_COFLICT_ERRORS.append((ConcurrentModificationError, None)) logger = logging.getLogger(__name__) class CannotResolveDatabaseConflict(Exception): """The managed_transaction decorator has given up trying to resolve the conflict. We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing. """ class ConflictResolver: def __init__(self, session_factory, retries): """ :param session_factory: `callback()` which will give us a new SQLAlchemy session object for each transaction and retry :param retries: The number of attempst we try to re-run the transaction in the case of transaction conflict. """ self.retries = retries self.session_factory = session_factory # Simple beancounting diagnostics how well we are doing self.stats = Counter(success=0, retries=0, errors=0, unresolved=0) @classmethod def is_retryable_exception(self, e): """Does the exception look like a database conflict error? Check for database driver specific cases. :param e: Python Exception instance """ if not isinstance(e, OperationalError): # Not an SQLAlchemy exception return False # The exception SQLAlchemy wrapped orig = e.orig for err, func in DATABASE_COFLICT_ERRORS: # EXception type matches, now compare its values if isinstance(orig, err): if func: return func(e) else: return True return False def managed_transaction(self, func): """SQL Seralized transaction isolation-level conflict resolution. When SQL transaction isolation level is its highest level (Serializable), the SQL database itself cannot alone resolve conflicting concurrenct transactions. Thus, the SQL driver raises an exception to signal this condition. ``managed_transaction`` decorator will retry to run everyhing inside the function Usage:: # Create new session for SQLAlchemy engine def create_session(): Session = sessionmaker() Session.configure(bind=engine) return Session() conflict_resolver = ConflictResolver(create_session, retries=3) # Create a decorated function which can try to re-run itself in the case of conflict @conflict_resolver.managed_transaction def myfunc(session): # Both threads modify the same wallet simultaneously w = session.query(BitcoinWallet).get(1) w.balance += 1 # Execute the conflict sensitive code inside a managed transaction myfunc() The rules: - You must not swallow all exceptions within ``managed_transactions``. Example how to handle exceptions:: # Create a decorated function which can try to re-run itself in the case of conflict @conflict_resolver.managed_transaction def myfunc(session): try: my_code() except Exception as e: if ConflictResolver.is_retryable_exception(e): # This must be passed to the function decorator, so it can attempt retry raise # Otherwise the exception is all yours - Use read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees eg for displaying the total balance. - Never do external actions, like sending emails, inside ``managed_transaction``. If the database transaction is replayed, the code is run twice and you end up sending the same email twice. - Managed transaction section should be as small and fast as possible - Avoid long-running transactions by splitting up big transaction to smaller worker batches This implementation heavily draws inspiration from the following sources - http://stackoverflow.com/q/27351433/315168 - https://gist.github.com/khayrov/6291557 """ def decorated_func(): # Read attemps from app configuration attempts = self.retries while attempts >= 0: session = self.session_factory() try: result = func(session) session.commit() self.stats["success"] += 1 return result except Exception as e: if self.is_retryable_exception(e): session.close() self.stats["retries"] += 1 attempts -= 1 if attempts < 0: self.stats["unresolved"] += 1 raise CannotResolveDatabaseConflict("Could not replay the transaction {} even after {} attempts".format(func, self.retries)) from e continue else: session.rollback() self.stats["errors"] += 1 # All other exceptions should fall through raise return decorated_func 
+2
source

Postgres and Oracle conflict errors are marked as retryable zope.sqlalchemy. Set the isolation level in the engine configuration and the re-transaction logic in pyramid_tm or Zope will work.

0
source

Source: https://habr.com/ru/post/1208555/


All Articles