I am currently experimenting with Spark and Mongodb, which uses the mongodb-hadoop connector to connect the spark-to-mongob connection. Here is an example https://github.com/plaa/mongo-spark , this example works well for me.
Then, based on this example, I used a larger dataset from https://github.com/10gen-interns/big-data-exploration , which has 6 million flight data records. I want to query the mongodb dataset and then do further processing.
The flight data chart is located at https://gist.github.com/sweetieSong/6016700
see sample data:
{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ), "age" : 27, "airTime" : 316, "airlineId" : 19805, "arrDelay" : -37, "arrTime" : Date( 1336304580000 ), "carrier" : "AA", "carrierId" : "AA", "crsArrTime" : Date( 1336306800000 ), "crsDepTime" : Date( 1336294800000 ), "crsElapsedTime" : 380, "date" : Date( 1336262400000 ), "dayOfMonth" : 6, "dayOfWeek" : 7, "depDelay" : -5, "depTime" : Date( 1336294500000 ), "destAirport" : "LAX", "destAirportId" : 12892, "destCity" : "Los Angeles, CA", "destCityId" : 32575, "destState" : "California", "destStateId" : "CA", "destWAC" : 91, "distance" : 2475, "diverted" : true, "elapsedTime" : 348, "flightNum" : 1, "month" : 5, "numDivAirportLandings" : 0, "numFlights" : 1, "origAirport" : "JFK", "origAirportId" : 12478, "origCity" : "New York, NY", "origCityId" : 31703, "origState" : "New York", "origStateId" : "NY", "origWAC" : 22, "quarter" : 2, "tailNum" : "N323AA", "taxiIn" : 19, "taxiOut" : 13, "wheelsOff" : Date( 1336295280000 ), "wheelsOn" : Date( 1336303440000 ), "year" : 2012 }
My scala code
val sc = new SparkContext("local", "Scala Word Count") val config = new Configuration() config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights") config.set("mongo.input.query","{destAirport: 'LAX'}");
println ("We run scala .. count", mongoRDD.count ())
For testing purposes, I just want to get all the records from destAirport 'LAX' first, I don’t know what the query looks like, so I tried two different query formats, "{destAirport: 'LAX'}" and "{_id.destAirport: 'LAX '} "
When the application starts, the console displays this information
INFO MongoCollectionSplitter: Created by split: min = {"_id": {"$ oid": "51bf29d8ca69141e42097d7f"}}, max = {"_id": {"$ oid": "51bf29dfca69141e420991ad"}}
14/08/05 10:30:51 INFO Executor: Running task ID 751 14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192) 14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750) 14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally 14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false} 14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from: 14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false} 14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597 14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver 14/08/05 10:30:51 INFO Executor: Finished task ID 751
No matter what the request is (without even asking), the spark always performs 1191 Tasks. Each task displays similar words. and mongoRDD.count () always output 0.
My first question is the correct request?
Also, I used to think what mongodb-hadoop does, is that mongodb first queries the entire collection and then sends the results back to the spark for processing. but now it seems to me that mongodb will divide the collection into many, and then request this small part of the collection, and then send the results of this part to a spark. It?