Scrapy Nonblocking Database Protocol

I have a web scraper in Scrapy that receives data items. I want to insert them asynchronously into a database.

For example, I have a transaction that inserts some elements into my db using SQLAlchemy Core:

def process_item(self, item, spider): with self.connection.begin() as conn: conn.execute(insert(table1).values(item['part1']) conn.execute(insert(table2).values(item['part2']) 

I understand that you can use SQLAlchemy Core asynchronously with Twisted with alchimia . Sample documentation code for alchimia below.

I do not understand how I can use my above code in the alchimia structure. How can I configure process_item to use a reactor?

Can I do something like this?

 @inlineCallbacks def process_item(self, item, spider): with self.connection.begin() as conn: yield conn.execute(insert(table1).values(item['part1']) yield conn.execute(insert(table2).values(item['part2']) 

How to write a part of the reactor?

Or is there an easier way to do non-blocking database inserts in the Scrapy pipeline?


For reference, here is a sample code from the alchimia documentation:

 from alchimia import TWISTED_STRATEGY from sqlalchemy import ( create_engine, MetaData, Table, Column, Integer, String ) from sqlalchemy.schema import CreateTable from twisted.internet.defer import inlineCallbacks from twisted.internet.task import react @inlineCallbacks def main(reactor): engine = create_engine( "sqlite://", reactor=reactor, strategy=TWISTED_STRATEGY ) metadata = MetaData() users = Table("users", metadata, Column("id", Integer(), primary_key=True), Column("name", String()), ) # Create the table yield engine.execute(CreateTable(users)) # Insert some users yield engine.execute(users.insert().values(name="Jeremy Goodwin")) yield engine.execute(users.insert().values(name="Natalie Hurley")) yield engine.execute(users.insert().values(name="Dan Rydell")) yield engine.execute(users.insert().values(name="Casey McCall")) yield engine.execute(users.insert().values(name="Dana Whitaker")) result = yield engine.execute(users.select(users.c.name.startswith("D"))) d_users = yield result.fetchall() # Print out the users for user in d_users: print "Username: %s" % user[users.c.name] if __name__ == "__main__": react(main, []) 
+5
source share
1 answer

How can I configure process_item to use a reactor?

You do not need to control another reactor in your pipeline.
Instead, you can perform asynchronous interactions with databases in the item pipeline, returning pending from the pipeline.

See also the Scrapy doc and sample code that performs asynchronous operations inside an element pipeline, returning pending ones .

+3
source

Source: https://habr.com/ru/post/1265099/


All Articles