Fixed work with Async HTTP call

I am creating an RDD from a list of URLs and then trying to extract the data using some async http call. I need all the results before doing other calculations. Ideally, I need to make http calls for different nodes to scale considerations.

I did something like this:

//init spark val sparkContext = new SparkContext(conf) val datas = Seq[String]("url1", "url2") //create rdd val rdd = sparkContext.parallelize[String](datas) //httpCall return Future[String] val requests = rdd.map((url: String) => httpCall(url)) //await all results (Future.sequence may be better) val responses = requests.map(r => Await.result(r, 10.seconds)) //print responses response.collect().foreach((s: String) => println(s)) //stop spark sparkContext.stop() 

This work, but Spark's work never ends!

So I wonder what are the best practices for working with Future using Spark (or Future [RDD]).

I think this use case looks pretty common, but has not yet found an answer.

Best wishes

+6
source share
4 answers

this use case looks pretty common

Not really, because it just doesn't work as you (probably) expect. Since each task works with standard Scala Iterators , these operations will be crushed together. This means that all operations will be blocked in practice. Assuming you have three URLs ["x", "y", "z"], the code will execute in the following order:

 Await.result(httpCall("x", 10.seconds)) Await.result(httpCall("y", 10.seconds)) Await.result(httpCall("z", 10.seconds)) 

You can easily reproduce the same behavior locally. If you want to execute your asynchronous code, you must handle this explicitly with mapPartitions :

 rdd.mapPartitions(iter => { ??? // Submit requests ??? // Wait until all requests completed and return Iterator of results }) 

but it is relatively difficult. There is no guarantee that all data for this section fits into memory, so you will probably also need a batch processing mechanism.

All that I canโ€™t reproduce the problem you described may be a configuration problem or a problem with httpCall .

On the side, a note allowing one timeout to kill an entire task does not seem like a good idea.

+6
source

I could not find an easy way to achieve this. But after several retries, this is what I did and his work on a huge list of queries. We mainly used this for a batch operation for a huge query in several subqueries.

 // Break down your huge workload into smaller chunks, in this case huge query string is broken // down to a small set of subqueries // Here if needed to optimize further down, you can provide an optimal partition when parallelizing val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq) // Then map each one those to a Spark Task, in this case its a Future that returns a string val tasks: RDD[Future[String]] = queries.map(query => { val task = makeHttpCall(query) // Method returns http call response as a Future[String] task.recover { case ex => logger.error("recover: " + ex.printStackTrace()) } task onFailure { case t => logger.error("execution failed: " + t.getMessage) } task }) // Note:: Http call is still not invoked, you are including this as part of the lineage // Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it // And Await for the result, in this way you making it to block untill all the future in that sequence is resolved val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] => val searchFuture: Future[Iterator[String]] = Future sequence f Await.result(searchFuture, threadWaitTime.seconds) } // Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. // When you perform any action on that Rdd, then at that point, // those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and // collect those data in a single rdd. 

If you do not want to perform any content conversion, for example, analyzing the response payload, etc. You can then use foreachPartition instead of mapPartitions to immediately make all these http calls.

+2
source

This does not work.

You cannot expect query objects to be distributed, and responses will be clustered by other nodes. If you do, then the spark calls for the future will never end. In this case, futures will never work.

If your card () performs synchronization requests (http), then please collect the answers in the same action / conversion call, and then specify the results (answers) to further display / reduce / other calls.

In your case, rewrite the logic to collect answers for each call in synchronization and remove the concept of futures, then everything should be fine.

+1
source

I finally did this with scalaj-http instead of Dispatch. The call is synchronous, but that matches my use case.

I think Spark Job never ends using Dispatch because the Http connection was not closed properly.

Best wishes

+1
source

Source: https://habr.com/ru/post/1244758/


All Articles