I found the same problem. As a workaround, I abandoned the newAPIHadoopRDD method and implemented a parallel loading mechanism based on determining the intervals on the document identifier and then loading each section in parallel. The idea is to implement the following mongo shell code using the MongoDB Java driver:
// Compute min and max id of the collection db.coll.find({},{_id:1}).sort({_id: 1}).limit(1) .forEach(function(doc) {min_id = doc._id}) db.coll.find({},{_id:1}).sort({_id: -1}).limit(1) .forEach(function(doc) {max_id = doc._id}) // Compute id ranges curr_id = min_id ranges = [] page_size = 1000 // to avoid the use of Comparable in the Java translation while(! curr_id.equals(max_id)) { prev_id = curr_id db.coll.find({_id : {$gte : curr_id}}, {_id : 1}) .sort({_id: 1}) .limit(page_size + 1) .forEach(function(doc) { curr_id = doc._id }) ranges.push([prev_id, curr_id]) }
Now we can use ranges to perform quick queries on fragments of the collection. Please note that the last fragment needs to be handled differently, as the minimum limit, to avoid losing the last document in the collection.
db.coll.find({_id : {$gte : ranges[1][0], $lt : ranges[1][1]}}) db.coll.find({_id : {$gte : ranges[2][0]}})
I implement this as the Java method "LinkedList computeIdRanges (DBCollection coll, int rangeSize)" for a simple POJO range, and then I parallelize the collection and convert it using flatMapToPair to generate an RDD similar to returning newAPIHadoopRDD.
List<Range> ranges = computeIdRanges(coll, DEFAULT_RANGE_SIZE); JavaRDD<Range> parallelRanges = sparkContext.parallelize(ranges, ranges.size()); JavaPairRDD<Object, BSONObject> mongoRDD = parallelRanges.flatMapToPair( new PairFlatMapFunction<MongoDBLoader.Range, Object, BSONObject>() { ... BasicDBObject query = range.max.isPresent() ? new BasicDBObject("_id", new BasicDBObject("$gte", range.min) .append("$lt", range.max.get())) : new BasicDBObject("_id", new BasicDBObject("$gte", range.min)); ...
You can play with the size of ranges and the number of slices used for parallelization to control the detail of parallelism.
I hope this helps,
Hello!
Juan Rodriguez Hortala