SQLAlchemy - creating multi-threaded persistent objects, how to join in one session to avoid state conflict?

I have tens (potentially hundreds) of thousands of permanent objects that I want to generate in a multi-threaded way due to the necessary processing.

While the creation of objects occurs in separate streams (using Flask-SQLAlchemy extension flags with sessions with binding), the recording of generated objects in the database is called in 1 place after the generation is completed.

I believe that the problem is that the objects being created are part of several existing relationships, thereby causing the identification to be automatically added to the map, despite the fact that they are created in separate parallel threads without an explicit session in any of the threads.

I was hoping to keep the generated objects in a single list, and then write the entire list (using a single session object) to the database. This results in an error:

AssertionError: A conflicting state is already present in the identity map for key (<class 'app.ModelObject'>, (1L,)) 

Therefore, why I believe that the ID card is already full, because when I try to add and commit using a global session outside of the parallel code, an approval error will be raised.

The last detail is that any session object (in brackets or otherwise, since I donโ€™t quite understand how automatic adding to an identification card works in case of multithreading), I canโ€™t find a way / donโ€™t know how to get a link to them so that even if I wanted to deal with a separate session for each process, I could.

Any advice is appreciated. The only reason I am not posting the code (for now) is that it is difficult to immediately draw a working example from my application. I will post if someone really needs to see this.

+4
source share
2 answers

Each session is thread-local; in other words, there is a separate session for each thread. If you decide to transfer some instances to another thread, they will be "disconnected" from the session. Use db.session.add_all(objects) in the receiving stream to return them back.

For some reason, it looks like you are creating objects with the same identifier (primary key columns) in different threads, and then trying to send them to the database. One option is to fix why this happens so that identity is guaranteed unique. You can also try merging ; merged_object = db.session.merge(other_object, load=False) .

Edit: zzzeek's comment made me think of something else that could go on:

With Flask-SQLAlchemy, the session is bound to the application context. Since this is a local thread, creating a new thread will invalidate the context; there will be no database session in the threads. All instances are deleted there and cannot properly track the relationship. One solution is to pass the app to each thread and do everything in the with app.app_context(): block. Inside the block, first use db.session.add to populate the local session with the passed instances. Then you must combine the master task to ensure consistency.

+3
source

I just want to clarify the problem and solution with some pseudo-code if someone has this problem / wants to do it in the future.

 class ObjA(object): obj_c = relationship('ObjC', backref='obj_c') class ObjB(object): obj_c = relationship('ObjC', backref='obj_c') class ObjC(object): obj_a_id = Column(Integer, ForeignKey('obj_a.id')) obj_b_id = Column(Integer, ForeignKey('obj_b.id')) def __init__(self, obj_a, obj_b): self.obj_a = obj_a self.obj_b = obj_b def make_a_bunch_of_c(obj_a, list_of_b=None): return [ObjC(obj_a, obj_b) for obj_b in list_of_b] def parallel_generate(): list_of_a = session.query(ObjA).all() # assume there are 1000 of these list_of_b = session.query(ObjB).all() # and 30 of these fxn = functools.partial(make_a_bunch_of_c, list_of_b=list_of_b) pool = multiprocessing.Pool(10) all_the_things = pool.map(fxn, list_of_a) return all_the_things 

Now let's stop here for a second. The initial problem was that trying to ADD in the ObjC list caused an error message in the original question:

 session.add_all(all_the_things) AssertionError: A conflicting state is already present in the identity map for key [...] 

Note. An error occurs during the add phase, an attempt to make a purchase never occurs, because the approval occurs with a preliminary commit. As far as I could judge.

Decision:

 all_the_things = parallel_generate() for thing in all_the_things: session.merge(thing) session.commit() 

The details of using the session when working with automatically added objects (through cascading relationships) are still outside of me, and I can not explain why the conflict arose. All I know is that using the merge function will force SQLAlchemy to sort all child objects created by 10 different processes into one session in the main process.

I would be interested to know why if someone happens through this.

+3
source

Source: https://habr.com/ru/post/1488195/


All Articles