How multi-threaded read dictionaries from a list and enter into a database

I am trying to multithreadedly read the following code, and I just can't get it to work.

The following code (from which I removed most of the code for illustrative purposes only) currently runs smoothly but slowly (approximately 5 minutes for a list of 3600 tweets).

import dataset import datetime import json with open("postgresConnecString.txt", 'r') as f: DB_CONNECTIONSTRING = f.readline() DB = dataset.connect(DB_CONNECTIONSTRING) def load_tweet(tweet, tweets_saved): """Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database""" try: data = {'tweet_id': tweet['tweet_id', 'tweet_json': json.dumps(tweet)} # Dictionary that contains the data I need from the tweet DB['tweets'].upsert(data, ['tweet_id']) tweets_saved += 1 if tweets_saved % 100 == 0: print('Saved ' + str(tweets_saved) + ' tweets') return tweets_saved except KeyError: return tweets_saved if __name__ == "__main__": tweets['tweet1', 'tweet2'] for tweet in tweets: tweets_saved = load_tweet(tweet, tweets_saved) 

So I was looking for an option to do this multithreaded. However, I have not yet found a way that I can:

  • Multi-threaded extraction process;
  • Print the counter for 100, 500 or 1000 tweets;

Going through this tutorial did not give me understanding to do it yet: the class concepts for each thread, what I need to configure in the class and implement the queue at the moment is very difficult for me to understand; I'm just getting started.

  • Can anyone specify how I will use the script above using multiple threads?
  • How many threads should I use? Python currently uses ~ 1% of my processor when running a script and ~ 10% of RAM (my system specifications )
  • How can I take care of increasing the counter (using the Lock () function?) And print it when the counter hits% 100?

EDIT: as requested: here are the big snapshots from the profiling result (using dataset.upsert):

  ncalls tottime percall cumtime percall filename:lineno(function) 5898 245.133 0.042 245.133 0.042 :0(_connect) 5898 12.137 0.002 12.206 0.002 :0(execute) 

Here is the second attempt with 'dataset.insert' instead of 'dataset.upsert':

 1386332 function calls (1382960 primitive calls) in 137.255 seconds ncalls tottime percall cumtime percall filename:lineno(function) 2955 122.646 0.042 122.646 0.042 :0 (_connect) 

Last (and certainly not less), here is the time you run the psycopg2 source code.

 63694 function calls (63680 primitive calls) in 2.203 seconds 

In conclusion, do not use the data set for performance (although it took me 10 minutes to write psycopg2 code, and that is โ†’ 10 seconds for the .upsert data set)

  • Now, regarding the original question. Will I be able to reduce a 2-second file per file in multi-threading? How?

Full code can be found here.

+4
source share
2 answers

A few things that can be improved:

Run the entire batch in one transaction. Using a transaction means that the database is not required for the actual commit (writing data to disk) for each individual record, but it can accumulate uncommitted data in memory. This usually leads to more efficient use of resources.

Add a unique index on tweet_id. Without a unique index, you can force the database to perform sequential scans on each upserts, which leads to an increase in memory to O (n ** 2).

Separate insert and updates, use .insert_many () when you can, not .upsert (). Before doing bulk upsert, you make a pre-flight request to find out the tweet_ids list that exists in both the database and your tweet list. Use .insert_many () to insert elements that do not already exist in the database, and simple .update () for those that already exist.

+3
source

I donโ€™t know if you can improve productivity. But about how I think you will need concurrent.futures.Executor.map. ProcessPoolExecutor, not ThreadPoolExecutor, should be what you want, although I'm not an expert.

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map

If you want to show progress, look at concurrent.futures.as_completed from the same module.

0
source

Source: https://habr.com/ru/post/1210660/


All Articles