How to request when connecting mongodb with apache-spark

I am currently experimenting with Spark and Mongodb, which uses the mongodb-hadoop connector to connect the spark-to-mongob connection. Here is an example https://github.com/plaa/mongo-spark , this example works well for me.

Then, based on this example, I used a larger dataset from https://github.com/10gen-interns/big-data-exploration , which has 6 million flight data records. I want to query the mongodb dataset and then do further processing.

The flight data chart is located at https://gist.github.com/sweetieSong/6016700

see sample data:

{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ), "age" : 27, "airTime" : 316, "airlineId" : 19805, "arrDelay" : -37, "arrTime" : Date( 1336304580000 ), "carrier" : "AA", "carrierId" : "AA", "crsArrTime" : Date( 1336306800000 ), "crsDepTime" : Date( 1336294800000 ), "crsElapsedTime" : 380, "date" : Date( 1336262400000 ), "dayOfMonth" : 6, "dayOfWeek" : 7, "depDelay" : -5, "depTime" : Date( 1336294500000 ), "destAirport" : "LAX", "destAirportId" : 12892, "destCity" : "Los Angeles, CA", "destCityId" : 32575, "destState" : "California", "destStateId" : "CA", "destWAC" : 91, "distance" : 2475, "diverted" : true, "elapsedTime" : 348, "flightNum" : 1, "month" : 5, "numDivAirportLandings" : 0, "numFlights" : 1, "origAirport" : "JFK", "origAirportId" : 12478, "origCity" : "New York, NY", "origCityId" : 31703, "origState" : "New York", "origStateId" : "NY", "origWAC" : 22, "quarter" : 2, "tailNum" : "N323AA", "taxiIn" : 19, "taxiOut" : 13, "wheelsOff" : Date( 1336295280000 ), "wheelsOn" : Date( 1336303440000 ), "year" : 2012 } 

My scala code

 val sc = new SparkContext("local", "Scala Word Count") val config = new Configuration() config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights") config.set("mongo.input.query","{destAirport: 'LAX'}"); //config.set("mongo.input.query","{_id.destAirport: 'LAX'}"); val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) 

println ("We run scala .. count", mongoRDD.count ())

For testing purposes, I just want to get all the records from destAirport 'LAX' first, I don’t know what the query looks like, so I tried two different query formats, "{destAirport: 'LAX'}" and "{_id.destAirport: 'LAX '} "

When the application starts, the console displays this information

INFO MongoCollectionSplitter: Created by split: min = {"_id": {"$ oid": "51bf29d8ca69141e42097d7f"}}, max = {"_id": {"$ oid": "51bf29dfca69141e420991ad"}}

 14/08/05 10:30:51 INFO Executor: Running task ID 751 14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192) 14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750) 14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally 14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false} 14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from: 14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false} 14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597 14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver 14/08/05 10:30:51 INFO Executor: Finished task ID 751 

No matter what the request is (without even asking), the spark always performs 1191 Tasks. Each task displays similar words. and mongoRDD.count () always output 0.

My first question is the correct request?

Also, I used to think what mongodb-hadoop does, is that mongodb first queries the entire collection and then sends the results back to the spark for processing. but now it seems to me that mongodb will divide the collection into many, and then request this small part of the collection, and then send the results of this part to a spark. It?

+6
source share
1 answer

My first question is the correct request?

I don’t think there is a “correct” request - you need to request based on the data you want to process

Also, I used to think what mongodb-hadoop does, is that mongodb first queries the entire collection and then sends the results back to the spark for processing. but now it seems to me that mongodb will divide the collection into many, and then request this small part of the collection, and then send the results of this part to a spark. It?

I ran into the same problem.

I believe that newAPIHadoopRDD, given MongoInputSplit.class, does not take the query into account when calculating partitions. It is used only after calculating the splits. This means that no matter how much your request is, the number of sections will remain the same and will be proportional to the size of the collection.

newAPIHadoopRDD uses StandaloneMongoSplitter. Note that this class does not use a query to calculate separation boundaries. It just uses the mongo internal splitVector command; from the documentation here - http://api.mongodb.org/internal/current/commands.html , it looks like it is not taking the request into account.

I have no good solution. A better approach would be to split the mongo collection only after computing the request, but this requires a different splitter implementation. Here are some good details about the problem: http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/

+6
source

Source: https://habr.com/ru/post/973540/


All Articles