Exception when connecting to mongodb in a spark

I get "java.lang.IllegalStateException: not ready" in org.bson.BasicBSONDecoder._decode when trying to use MongoDB as input RDD:

Configuration conf = new Configuration(); conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input"); JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class); System.out.println(rdd.count()); 

With the exception I get: 14/08/06 09:49:57 INFO rdd.NewHadoopRDD: Splitting the input:

 MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57 WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException java.lang.IllegalStateException: not ready at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139) at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123) at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:618) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626) at java.lang.Thread.run(Thread.java:804) 

All program output is here.

Wednesday:

  • Redhat
  • Spark 1.0.1
  • Hadoop 2.4.1
  • MongoDB 2.4.10
  • Mongo-Hadoop-1,3
+5
source share
3 answers

I think I found the problem: mongodb-hadoop has a "static" modifier on its BSON Encoder / decoder instances in core / src / main / java / com / mongodb / hadoop / input / MongoInputSplit.java. When Spark starts in multi-threaded mode, all threads try and deserialize using identical encoder / decoder instances that predictively have poor results.

Patch on my github here (sent a request to upload upstream)

Now I can run the 8-core multithreaded Spark-> mongo collection count () from Python!

+5
source

I found the same problem. As a workaround, I abandoned the newAPIHadoopRDD method and implemented a parallel loading mechanism based on determining the intervals on the document identifier and then loading each section in parallel. The idea is to implement the following mongo shell code using the MongoDB Java driver:

 // Compute min and max id of the collection db.coll.find({},{_id:1}).sort({_id: 1}).limit(1) .forEach(function(doc) {min_id = doc._id}) db.coll.find({},{_id:1}).sort({_id: -1}).limit(1) .forEach(function(doc) {max_id = doc._id}) // Compute id ranges curr_id = min_id ranges = [] page_size = 1000 // to avoid the use of Comparable in the Java translation while(! curr_id.equals(max_id)) { prev_id = curr_id db.coll.find({_id : {$gte : curr_id}}, {_id : 1}) .sort({_id: 1}) .limit(page_size + 1) .forEach(function(doc) { curr_id = doc._id }) ranges.push([prev_id, curr_id]) } 

Now we can use ranges to perform quick queries on fragments of the collection. Please note that the last fragment needs to be handled differently, as the minimum limit, to avoid losing the last document in the collection.

 db.coll.find({_id : {$gte : ranges[1][0], $lt : ranges[1][1]}}) db.coll.find({_id : {$gte : ranges[2][0]}}) 

I implement this as the Java method "LinkedList computeIdRanges (DBCollection coll, int rangeSize)" for a simple POJO range, and then I parallelize the collection and convert it using flatMapToPair to generate an RDD similar to returning newAPIHadoopRDD.

 List<Range> ranges = computeIdRanges(coll, DEFAULT_RANGE_SIZE); JavaRDD<Range> parallelRanges = sparkContext.parallelize(ranges, ranges.size()); JavaPairRDD<Object, BSONObject> mongoRDD = parallelRanges.flatMapToPair( new PairFlatMapFunction<MongoDBLoader.Range, Object, BSONObject>() { ... BasicDBObject query = range.max.isPresent() ? new BasicDBObject("_id", new BasicDBObject("$gte", range.min) .append("$lt", range.max.get())) : new BasicDBObject("_id", new BasicDBObject("$gte", range.min)); ... 

You can play with the size of ranges and the number of slices used for parallelization to control the detail of parallelism.

I hope this helps,

Hello!

Juan Rodriguez Hortala

+4
source

I had the same combination of exceptions after importing a BSON file using mongorestore. Calling db.collecion.reIndex () solved the problem for me.

0
source

Source: https://habr.com/ru/post/973682/


All Articles