In Spark, can I recognize a machine in a cluster that stores this item in RDD, and then send a message to it?

I am new to Spark.

I want to know if in RDD RDD = {"0", "1", "2",... "99999"} recognize the machine in the cluster that stores this element (for example, 100 )?

And then, in random order, can I combine some data and send it to a specific machine? I know that the RDD section is transparent to users, but can I use some method like key / value to achieve this?

0
source share
1 answer

In general, the answer is not, or at least not related to, the RDD API. If you can express your logic with graphs, you can try using the message-based API in GraphX ​​or Giraph. If not, then using Akka directly instead of Spark might be the best choice.

However, there are some workarounds, but I did not expect high performance. Let's start with some dummy data:

 import org.apache.spark.rdd.RDD val toPairs = (s: Range) => s.map(_.toChar.toString) val rdd: RDD[(Int, String)] = sc.parallelize(Seq( (0, toPairs(97 to 100)), // ad (1, toPairs(101 to 107)), // ek (2, toPairs(108 to 115)) // ls )).flatMap{ case (i, vs) => vs.map(v => (i, v)) } 

and split it using a custom separator:

 import org.apache.spark.Partitioner class IdentityPartitioner(n: Int) extends Partitioner { def numPartitions: Int = n def getPartition(key: Any): Int = key.asInstanceOf[Int] } val partitioner = new IdentityPartitioner(4) val parts = rdd.partitionBy(partitioner) 

Now we have an RDD with 4 sections, including one empty:

 parts.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))).collect // Array[(Int, Int)] = Array((0,4), (1,7), (2,8), (3,0)) 
  • The simplest thing you can do is use markup. First up is a dummy function and helper:

     // Dummy map function def transform(s: String) = Map("e" -> "x", "k" -> "y", "l" -> "z").withDefault(identity)(s) // Map String to partition def address(curr: Int, s: String) = { val m = Map("x" -> 3, "y" -> 3, "z" -> 3).withDefault(x => curr) (m(s), s) } 

    and send":

     val transformed: RDD[(Int, String)] = parts // Emit pairs (partition, string) .map{case (i, s) => address(i, transform(s))} // Repartition .partitionBy(partitioner) transformed .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))) .collect // Array[(Int, Int)] = Array((0,4), (1,5), (2,7), (3,3)) 
  • another approach is to collect "messages":

     val tmp = parts.mapValues(s => transform(s)) val messages: Map[Int,Iterable[String]] = tmp .flatMap{case (i, s) => { val target = address(i, s) if (target != (i, s)) Seq(target) else Seq() }} .groupByKey .collectAsMap 

    create broadcast

     val messagesBD = sc.broadcast(messages) 

    and use it to send messages:

     val transformed = tmp .filter{case (i, s) => address(i, s) == (i, s)} .mapPartitionsWithIndex((i, iter) => { val combined = iter ++ messagesBD.value.getOrElse(i, Seq()) combined.map((i, _)) }, true) transformed .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))) .collect // Array[(Int, Int)] = Array((0,4), (1,5), (2,7), (3,3)) 
0
source

Source: https://habr.com/ru/post/1235722/


All Articles