Parallel loading data from MongoDB in python

All documents in my collection in MongoDB have the same fields. My goal is to load them into Python in pandas.DataFrameor dask.DataFrame.

I would like to speed up the boot process by parallelizing it. My plan is to create multiple processes or threads. Each process will load a piece of the collection, then these pieces will be merged together.

How do I do this using MongoDB?

I tried a similar approach with PostgreSQL. My original idea was to use SKIP, and LIMITin SQL-queries. It failed because each cursor open for each particular query started reading the data table from the very beginning and simply skipped the specified number of rows. Therefore, I had to create an additional column containing the record numbers and specify the ranges of these numbers in the queries.

Conversely, MongoDB assigns a unique ObjectID to each document. However, I found that it is impossible to subtract one ObjectID from another, they can only be compared with the ordering operations: less, more and equal.

In addition, pymongoit returns a cursor object, which supports the indexing operation, and has some of the techniques that seem useful for my task, for example count, LIMIT.

The MongoDB connector for Spark does this task somehow. Unfortunately, I am not familiar with Scala, so it’s hard for me to find out how they do it.

So what is the correct way to parallel load data from Mongo in python?

So far I have come to the following solution:

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

# import other modules.

collection = get_mongo_collection()
cursor = collection.find({ })

def process_document(in_doc):
    out_doc = # process doc keys and values
    return pd.DataFrame(out_doc)

df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )

However, it looks like it dask.dataframe.from_delayedinternally creates a list from the passed generator, effectively loading the entire collection into a single thread.

Update. , SKIP pymongo.Cursor , PostgreSQL. . , , _id. , _id, , .

Update2. Spark MongoDb: https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32

, .

Update3: .

, pymongo, dask Collection:

/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***)
     81         return expr, {}
     82     if isinstance(expr, (Iterator, list, tuple, set)):
---> 83         args, dasks = unzip((to_task_dask(e) for e in expr), 2)
     84         args = list(args)
     85         dsk = sharedict.merge(*dasks)

/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
   2342 
   2343     def __next__(self):
-> 2344         raise TypeError("'Collection' object is not iterable")
   2345 
   2346     next = __next__

TypeError: 'Collection' object is not iterable

:

def process_document(in_doc, other_arg):
    # custom processing of incoming records
    return out_doc

def compute_id_ranges(collection, query, partition_size=50):
    cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING)
    id_ranges = [cur[0]['_id']]
    count = 1
    for r in cur: 
        count += 1
        if count > partition_size:
            id_ranges.append(r['_id'])
            count = 0
    id_ranges.append(r['_id'])
    return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])    


def load_chunk(id_pair, collection, query={}, projection=None):
    q = query
    q.update( {"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}} )
    cur = collection.find(q, projection)

    return pd.DataFrame([process_document(d, other_arg) for d in cur])


def parallel_load(*args, **kwargs):
    collection = kwargs['collection']
    query = kwargs.get('query', {})
    projection = kwargs.get('projection', None)

    id_ranges = compute_id_ranges(collection, query)

    dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
    df = dd.from_delayed(dfs)
    return df

collection = connect_to_mongo_and_return_collection_object(credentials)

# df = parallel_load(collection=collection)

id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)

load_chunk . delayed(load_chunk)( blah-blah-blah ) , .

+4
2

" mans, thery're rulez":)

pymongo.Collection parallel_scan, .

UPDATE. , , ( ). .

0

, . 100 , 40 . CPU 100%, AC:)

, . Python 3:

import multiprocessing
from pymongo import MongoClient

def your_function(something):
    <...>
    return result

def process_cursor(skip_n,limit_n):
    print('Starting process',skip_n//limit_n,'...')
    collection = MongoClient().<db_name>.<collection_name>
    cursor = collection.find({}).skip(skip_n).limit(limit_n)
    for doc in cursor:        
        <do your magic> 
        # for example:
        result = your_function(doc['your_field'] # do some processing on each document
        # update that document by adding the result into a new field
        collection.update_one({'_id': doc['_id']}, {'$set': {'<new_field_eg>': result} })

    print('Completed process',skip_n//limit_n,'...')


if __name__ == '__main__':
    n_cores = 7                # number of splits (logical cores of the CPU-1)
    collection_size = 40126904 # your collection size
    batch_size = round(collection_size/n_cores+0.5)
    skips = range(0, n_cores*batch_size, batch_size)

    processes = [ multiprocessing.Process(target=process_cursor, args=(skip_n,batch_size)) for skip_n in skips]

    for process in processes:
        process.start()

    for process in processes:
        process.join()

, ,

+1

Source: https://habr.com/ru/post/1677550/


All Articles