In general, the answer is not, or at least not related to, the RDD API. If you can express your logic with graphs, you can try using the message-based API in GraphX ββor Giraph. If not, then using Akka directly instead of Spark might be the best choice.
However, there are some workarounds, but I did not expect high performance. Let's start with some dummy data:
import org.apache.spark.rdd.RDD val toPairs = (s: Range) => s.map(_.toChar.toString) val rdd: RDD[(Int, String)] = sc.parallelize(Seq( (0, toPairs(97 to 100)), // ad (1, toPairs(101 to 107)), // ek (2, toPairs(108 to 115)) // ls )).flatMap{ case (i, vs) => vs.map(v => (i, v)) }
and split it using a custom separator:
import org.apache.spark.Partitioner class IdentityPartitioner(n: Int) extends Partitioner { def numPartitions: Int = n def getPartition(key: Any): Int = key.asInstanceOf[Int] } val partitioner = new IdentityPartitioner(4) val parts = rdd.partitionBy(partitioner)
Now we have an RDD with 4 sections, including one empty:
parts.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))).collect // Array[(Int, Int)] = Array((0,4), (1,7), (2,8), (3,0))
The simplest thing you can do is use markup. First up is a dummy function and helper:
// Dummy map function def transform(s: String) = Map("e" -> "x", "k" -> "y", "l" -> "z").withDefault(identity)(s) // Map String to partition def address(curr: Int, s: String) = { val m = Map("x" -> 3, "y" -> 3, "z" -> 3).withDefault(x => curr) (m(s), s) }
and send":
val transformed: RDD[(Int, String)] = parts // Emit pairs (partition, string) .map{case (i, s) => address(i, transform(s))} // Repartition .partitionBy(partitioner) transformed .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))) .collect // Array[(Int, Int)] = Array((0,4), (1,5), (2,7), (3,3))
another approach is to collect "messages":
val tmp = parts.mapValues(s => transform(s)) val messages: Map[Int,Iterable[String]] = tmp .flatMap{case (i, s) => { val target = address(i, s) if (target != (i, s)) Seq(target) else Seq() }} .groupByKey .collectAsMap
create broadcast
val messagesBD = sc.broadcast(messages)
and use it to send messages:
val transformed = tmp .filter{case (i, s) => address(i, s) == (i, s)} .mapPartitionsWithIndex((i, iter) => { val combined = iter ++ messagesBD.value.getOrElse(i, Seq()) combined.map((i, _)) }, true) transformed .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))) .collect // Array[(Int, Int)] = Array((0,4), (1,5), (2,7), (3,3))