Merge / Merge Identifier Strings in a Spark

Given a file filled with identifier strings, e.g.

i1, i2, i5
i3, i4
i2, i6, i7
i4, i8
i9, i3

How can you join them by linking the same identifiers? So, for the example above, line 1 is connected to line 3 through i2, and line 2 is connected to lines 4 and 5 through i4 and i3, respectively. This will give you the following (duplicates removed)

i1, i2, i5, i6, i7
i3, i4, i8, i9

I could do this by going line by line, but wondered how you would do this functionally?

+4
source share
4 answers

When you use Apache Spark, you can use the GraphX ​​built-in component to do this work.

import org.apache.spark.graphx._

def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)

val data = sc.parallelize(List(
    "1\t5\t3", 
    "3\t9\t30",
    "7\t10\t12",
    "10\t7\t13"
))

val prep = data.map(x => x.split("\t").map(_.toLong).toList)

val vertex = prep
  .flatMap(x => x)
  .map(x => x -> s"ID=$x")

val edges = prep
  .map(x => cross(x, x))
  .flatMap(x => x)
  .map(x => new Edge(x._1, x._2, "likes"))

val graph = Graph(vertex, edges)
val linked = graph
  .connectedComponents
  .vertices
  .map(_.swap)
  .groupByKey

linked.take(10).foreach(println)

Print the following result:

(1,CompactBuffer(30, 3, 9, 1, 5))
(7,CompactBuffer(7, 10, 12, 13))

Cross , .

connectedComponents , , , Id → "".

:

graph.connectedComponents.vertices.take(10).foreach(println)

(30,1)
(1,1)
(3,1)
(5,1)
(7,7)
(9,1)
(10,7)
(12,7)
(13,7)

, 1 7 " " . , .

+1

, Spark 2.0 +

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate;
val df = spark.sparkContext.parallelize(
  List(
    "i1, i2, i5",
    "i3, i4",
    "i2, i6, i7",
    "i4, i8")
)

//Group lines with tokens count (determing by the last occurence of comma)
val rddGroupByTokensCount = df.map(row => (row.lastIndexOf(','), row.split(", ")))
  .groupBy(_._1)

//Now gather all the token to single place with flatMap and drop duplicates
val rddUniqueTokens = rddGroupByTokensCount.map(_._2.flatMap(_._2).toSet)

//print grouped unique tokens by the count in each line
rddUniqueTokens.collect().map(println)

:

Set(i5, i1, i7, i2, i6)
Set(i3, i4, i8)
+1

O(n * n), , , O(n * k), k - . :

val input = ...//I will assume your input is an RDD[List]

val idArray = Array(id1, id2, id3, id4, id5, id6, id6)//Array containing all IDs
val result = sc.parallelize(idArray, k).map(x => (x, x))  
input = input.map(x => (x(0), if(x.length > 0) x.slice(1, x.length) else null))

//If you can afford to persist it would help greatly:
result.persist
input.persist

//We can make this loop smaller if k is large and your lists are small
//by setting the upper bound of the range to the length of the longest list.
//I'll leave this decision up to you.
for (i <- 0 to k){ 
    result = result.cogroup(input)
    input = input.map((t: (x, y)) => (y(0), if(y.length > 0) y.slice(1, y.length) else null))
}
result.map((t: (x, y)) => y.distinct)//we want distinct lists in output

result.unpersist
input.unpersist
0

So this is probably not optimal, but I figured it would cost independently. It is assumed that your input file is small enough to be stored in memory (because it's all Scala vanilla).

I decided to solve this problem by considering these identifiers as adjacencies on the graph, and then using BFS to list all connected components.

/* Input, can be read from file easily by splitting on ", " */
val lines = List(List("i1", "i2", "i5"),
    List("i3", "i4"),
    List("i2", "i6", "i7"),
    List("i4", "i8"),
    List("i9", "i3"))

/* finds all sequential pairs */
val pairs = lines.flatMap(x => x.dropRight(1).zip(x.drop(1)))

/* create an empty adjacency map: id -> (Set of adjacent vertices) */
val defMap = Map[String, Set[String]]().withDefaultValue(Set[String]())

/* populate the default map with the actual (symmetric) adjacencies */
val adjMap = pairs.foldLeft{defMap}(
    (acc, x) => acc + (x._1 -> (acc(x._1) + x._2)) + (x._2 -> (acc(x._2) + x._1)))

/* BFS algo on map representation of graph */
def mapBFS(adjMap: Map[String, Set[String]]): List[List[String]] =
{
    val v = adjMap.keys
    var globalVisits = List[String]()
    def BFS_r(elems: List[String], visited: List[List[String]]): List[String] =
    {
        val newNeighbors = elems.flatMap(adjMap(_)).filterNot(visited.flatten.contains).distinct
        if (newNeighbors.isEmpty)
            visited.flatten
        else
            BFS_r(newNeighbors, newNeighbors :: visited)
    }
    v.flatMap(x =>{
        if (globalVisits.contains(x))
            None
        else
        {
            val vi: List[String] = BFS_r(List(x), List(List(x)))
            globalVisits = globalVisits ++ vi
            Some(vi)
        }
    }).toList
}
mapBFS(adjMap).foreach{println}

Which gives the following conclusion:

List(i7, i1, i6, i2, i5)
List(i8, i4, i3, i9)
0
source

Source: https://habr.com/ru/post/1661122/


All Articles