Multiprocessor module for updating a shared dictionary in Python

I create a dictionary as follows:

y=[(1,2),(2,3),(1,2),(5,6)] dict={} for tup in y: tup=tuple(sorted(tup)) if tup in dict.keys(): dict[tup]=dict[tup]+1 else: dict[tup]=1 

However, my actual y contains about 40 million tuples, is there a way to use multiprocessing to speed up this process?

thanks

+5
source share
5 answers

If you want to get a counter that ignores order, use frozenset with a counter:

 from collections import Counter print(Counter(map(frozenset, y))) 

Using tuples from another answer:

 In [9]: len(tuples) Out[9]: 500000 In [10]: timeit Counter(map(frozenset, tuples)) 1 loops, best of 3: 582 ms per loop 

Using frozenset will mean (1, 2) and (2,1) will be considered the same:

 In [12]: y = [(1, 2), (2, 3), (1, 2), (5, 6),(2, 1),(6,5)] In [13]: from collections import Counter In [14]: In [14]: print(Counter(map(frozenset, y))) Counter({frozenset({1, 2}): 3, frozenset({5, 6}): 2, frozenset({2, 3}): 1}) 

If you apply the same logic using multiprocessing, it will obviously be significantly faster, even if it does not exceed what was provided using multiprocessing.

+3
source

You can follow MapReduce .

 from collections import Counter from multiprocessing import Pool NUM_PROCESSES = 8 y = [(1,2),(2,3),(1,2),(5,6)] * 10 ## http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in xrange(0, len(l), n): yield l[i:i+n] ## map partial_counters = Pool(NUM_PROCESSES).map(Counter, chunks(y, NUM_PROCESSES)) ## reduce reduced_counter = reduce(Counter.__add__, partial_counters) ## Result is: ## Counter({(1, 2): 20, (5, 6): 10, (2, 3): 10}) 

The idea is this:

  • split your input list into pieces
  • pass each fragment to a separate process, which will independently calculate the counters
  • Combined all partial calculations using the reduction operation.

EDIT: use chunks(map(frozenset, y), NUM_PROCESSES) to account for unordered pairs.

+2
source

First of all, instead of checking the tup membership in dict.keys at each iteration, which is a really bad idea, you can use collections.defaultdict() for this purpose, which is more pythonic:

 from collections import defaultdict test_dict = defaultdict(lambda:1) for tup in y: tup=tuple(sorted(tup)) test_dict[tup]=+1 

Secondly, if you want to use concurrency, you can use multithreading or multiprocessing, but about multithreading due to the GIL, several threads cannot execute one bytecode at once, and you cannot pass your tuples on both sides, like the BDS algorithm.

But for multiprocessing, you will have another problem that accesses one shared memory from each core and works on it right away for more information. read this answer fooobar.com/questions/796323 / ....

So what will be the trick?

One way is to split your list into small pieces and use multithreading to apply your function in the specified part.

Another way is to use coroutines and routines, which, as mentioned in this answer, Greg Ewing has a great demo on how to use the income from using coroutines to create things like planners or multivariate modeling.

0
source

Edit: Answer edited as thread safe

The multiprocessing module makes work easier.

Just reformat your code to do the processing in the function:

 def process_tuple(tuples): count_dict = {} for tuple_ in tuples: tuple_=tuple(sorted(tuple_)) if tuple_ in count_dict: count_dict[tuple_] += 1 else: count_dict[tuple_] = 1 return count_dict 

Divide the list of sets into a small group, then use map to process all of your groups.

 ## http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in xrange(0, len(l), n): yield l[i:i+n] # cut tuples list into 5 chunks tuples_groups = chunks(tuples, 5) pool = Pool(5) count_dict = {} # processes chunks in parallel results = pool.map(process_tuple, tuples_groups) # collect results for result in results: count_dict.update(result) 

multiprocessing.Pool will handle the distribution between threads.

Full example + tests:

 import time import random start_time = time.time() tuples = [] x,y = (100000, 10) for i in range(x): tuple_ = [] for j in range(y): tuple_.append(random.randint(0, 9)) tuples.append(tuple(tuple_)) print("--- %s data generated in %s seconds ---" % (x*y, time.time() - start_time)) def process_tuple(tuples): count_dict = {} for tuple_ in tuples: tuple_=tuple(sorted(tuple_)) if tuple_ in count_dict: count_dict[tuple_] += 1 else: count_dict[tuple_] = 1 return count_dict from multiprocessing import Pool start_time = time.time() ## http://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks-in-python def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in xrange(0, len(l), n): yield l[i:i+n] # cut tuples list into 5 chunks tuples_groups = chunks(tuples, 5) pool = Pool(5) count_dict = {} # processes chunks in parallel results = pool.map(process_tuple, tuples_groups) # collect results for result in results: count_dict.update(result) print("--- Multithread processed in %s seconds ---" % (time.time() - start_time)) start_time = time.time() count_dict = {} for tuple_ in tuples: tuple_=tuple(sorted(tuple_)) if tuple_ in count_dict: count_dict[tuple_] += 1 else: count_dict[tuple_] = 1 print("--- Single thread processed in %s seconds ---" % (time.time() - start_time)) 

--- 10,000,000 data generated in 32.7803010941 seconds ---
--- Multithreading processed in 1.79116892815 seconds ---
--- The only thread processed in 2.65010404587 seconds ---

0
source

Since you want to increase the number (not just create new key / value pairs), the dictionary is not thread safe to use unless you get a semaphore around each update and release it afterwards, so I don't think you will get any general speed increase , in fact, it can be slower.

If you intend to use this thread, it is probably best for each thread to update its own dictionary and then combine the results as each thread completes, so there is no doubt about the reliability of the threads. However, since it is likely to be processor bound, you should use multiprocessing rather than threads. Multiprocessing can use all of your processor cores.

In addition, if you use the .Counter collection, it will do the counting for you and support merging, and has a useful most_common (n) method to return keys with the maximum number n.

0
source

Source: https://habr.com/ru/post/1237917/


All Articles