Strategy for being able to undo deletion using SQLAlchemy

I have a number of related tables supported by Python / SQLAlchemy. If I delete a row in a specific table, I want you to be able to cancel this deletion at some point in the future, in case of an error. I could do this using the is_deleted column and filter, but it gets painful when I query other tables for related data. I could add the is_deleted column to all other tables, and when the row from the main table is deleted, switch them all. But then for each query in all tables I will need to filter is_deleted. This can be done, but I hope for a better strategy.

One thought would be to move all deleted data to another set of tables that store only deleted data. But it is not clear to me whether SQLAlchemy can switch the table with which a particular object is associated. I think this would be the preferred solution, but I do not know if this can be done.

Another thought is that I can start the second database and copy the deleted data. But this adds a level of administration complexity that I would like to avoid.

Any thoughts would be appreciated.

+4
source share
2 answers

Many people do the "is_deleted" thing, and I agree that I am not a fan of this either, although we have a recipe for this in PreFilteredQuery .

What you are looking for, as someone else suggested, is a recipe for "version control." We have a comprehensive example of storing copies of data in a separate version of the table presented in Versioned Objects in the SQLAlchemy documentation.

Here I adapted some of the methods used in this example to get a more direct recipe that specifically tracks only "deleted" objects and includes a "restore" function that will "restore" this line back to the main table. So it’s not so much “SQLAlchemy allows me to switch the table that a particular object is associated with”, it looks more like another mapped class, which is similar to the main one, which can also be used for “reverse” deletion as the query goes. Everything that is under the line for __main__ , is proof of concept.

 from sqlalchemy.orm import Session, object_session from sqlalchemy import event def preserve_deleted(class_): def copy_col(col): newcol = col.copy() newcol.constraints = set() return newcol keys = class_.__table__.c.keys() cols = dict( (col.key, copy_col(col)) for col in class_.__table__.c ) cols['__tablename__'] = "%s_deleted" % class_.__table__.name class History(object): def restore(self): sess = object_session(self) sess.delete(self) sess.add(copy_inst(self, class_)) hist_class = type( '%sDeleted' % class_.__name__, (History, Base), cols) def copy_inst(fromobj, tocls): return tocls(**dict( (key, getattr(fromobj, key)) for key in keys )) @event.listens_for(Session, 'before_flush') def check_deleted(session, flush_context, instances): for del_ in session.deleted: if isinstance(del_, class_): h = copy_inst(del_, hist_class) session.add(h) class_.deleted = hist_class return class_ if __name__ == '__main__': from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, ForeignKey, Integer, String from sqlalchemy.orm import relationship, Session from sqlalchemy import create_engine Base = declarative_base() @preserve_deleted class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) data1 = Column(String) data2 = Column(String) @preserve_deleted class B(Base): __tablename__ = 'b' id = Column(Integer, primary_key=True) data1 = Column(String) a_id = Column(Integer, ForeignKey('a.id')) a = relationship("A") e = create_engine('sqlite://', echo=True) Base.metadata.create_all(e) s = Session(e) a1, a2, a3, a4 = \ A(data1='a1d1', data2='a1d2'),\ A(data1='a2d1', data2='a2d2'),\ A(data1='a3d1', data2='a3d2'),\ A(data1='a4d1', data2='a4d2') b1, b2, b3, b4 = \ B(data1='b1', a=a1),\ B(data1='b2', a=a1),\ B(data1='b3', a=a3),\ B(data1='b4', a=a4) s.add_all([ a1, a2, a3, a4, b1, b2, b3, b4 ]) s.commit() assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )] assert s.query(B.id).order_by(B.id).all() == [(1, ), (2, ), (3, ), (4, )] s.delete(a2) s.delete(b2) s.delete(b3) s.delete(a3) s.commit() assert s.query(A.id).order_by(A.id).all() == [(1, ), (4, )] assert s.query(B.id).order_by(B.id).all() == [(1, ), (4, )] a2_deleted = s.query(A.deleted).filter(A.deleted.id == 2).one() a2_deleted.restore() b3_deleted = s.query(B.deleted).filter(B.deleted.id == 3).one() a3_deleted = s.query(A.deleted).filter(A.deleted.id == 3).one() b3_deleted.restore() a3_deleted.restore() s.commit() assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )] assert s.query(B.id).order_by(B.id).all() == [(1, ), (3, ), (4, )] 
+4
source

I would try to implement something similar to django-reversion .

This means that you will have a table that can contain serialized data from any other table, as well as information about which table it is, etc.

As an example, consider the django-reversion Version model . The content_type field refers to the Django model containing information about the model, in your case it can simply be a char field containing table names (a table of content types will be better if you have a large number of tables).

Then you can add code so after each insert or update your version table is also updated. Whenever you want to restore something, you simply get the serialized data from the version table and insert the record.

There may be some caveats, such as M2M, cascading deletes, etc. But I would start there.

+1
source

Source: https://habr.com/ru/post/1444422/


All Articles