All documents in my collection in MongoDB have the same fields. My goal is to load them into Python in pandas.DataFrameor dask.DataFrame.
I would like to speed up the boot process by parallelizing it. My plan is to create multiple processes or threads. Each process will load a piece of the collection, then these pieces will be merged together.
How do I do this using MongoDB?
I tried a similar approach with PostgreSQL. My original idea was to use SKIP, and LIMITin SQL-queries. It failed because each cursor open for each particular query started reading the data table from the very beginning and simply skipped the specified number of rows. Therefore, I had to create an additional column containing the record numbers and specify the ranges of these numbers in the queries.
Conversely, MongoDB assigns a unique ObjectID to each document. However, I found that it is impossible to subtract one ObjectID from another, they can only be compared with the ordering operations: less, more and equal.
In addition, pymongoit returns a cursor object, which supports the indexing operation, and has some of the techniques that seem useful for my task, for example count, LIMIT.
The MongoDB connector for Spark does this task somehow. Unfortunately, I am not familiar with Scala, so it’s hard for me to find out how they do it.
So what is the correct way to parallel load data from Mongo in python?
So far I have come to the following solution:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
collection = get_mongo_collection()
cursor = collection.find({ })
def process_document(in_doc):
out_doc =
return pd.DataFrame(out_doc)
df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
However, it looks like it dask.dataframe.from_delayedinternally creates a list from the passed generator, effectively loading the entire collection into a single thread.
Update. , SKIP pymongo.Cursor , PostgreSQL. . , , _id. , _id, , .
Update2. Spark MongoDb: https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32
, .
Update3: .
, pymongo, dask Collection:
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***)
81 return expr, {}
82 if isinstance(expr, (Iterator, list, tuple, set)):
---> 83 args, dasks = unzip((to_task_dask(e) for e in expr), 2)
84 args = list(args)
85 dsk = sharedict.merge(*dasks)
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
2342
2343 def __next__(self):
-> 2344 raise TypeError("'Collection' object is not iterable")
2345
2346 next = __next__
TypeError: 'Collection' object is not iterable
:
def process_document(in_doc, other_arg):
return out_doc
def compute_id_ranges(collection, query, partition_size=50):
cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING)
id_ranges = [cur[0]['_id']]
count = 1
for r in cur:
count += 1
if count > partition_size:
id_ranges.append(r['_id'])
count = 0
id_ranges.append(r['_id'])
return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])
def load_chunk(id_pair, collection, query={}, projection=None):
q = query
q.update( {"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}} )
cur = collection.find(q, projection)
return pd.DataFrame([process_document(d, other_arg) for d in cur])
def parallel_load(*args, **kwargs):
collection = kwargs['collection']
query = kwargs.get('query', {})
projection = kwargs.get('projection', None)
id_ranges = compute_id_ranges(collection, query)
dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
df = dd.from_delayed(dfs)
return df
collection = connect_to_mongo_and_return_collection_object(credentials)
id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)
load_chunk . delayed(load_chunk)( blah-blah-blah ) , .