Splitting strings in Apache Spark using Scala

I have a dataset that contains strings in the format (tab is split):

Title<\t>Text

Now for every word in Text, I want to create a pair (Word,Title). For instance:

ABC      Hello World

gives me

(Hello, ABC)
(World, ABC)

Using Scala, I wrote the following:

val file = sc.textFile("s3n://file.txt")
val title = file.map(line => line.split("\t")(0))
val wordtitle = file.map(line => (line.split("\t")(1).split(" ").map(word => (word, line.split("\t")(0)))))

But this gives me the following result:

[Lscala.Tuple2;@2204b589
[Lscala.Tuple2;@632a46d1
[Lscala.Tuple2;@6c8f7633
[Lscala.Tuple2;@3e9945f3
[Lscala.Tuple2;@40bf74a0
[Lscala.Tuple2;@5981d595
[Lscala.Tuple2;@5aed571b
[Lscala.Tuple2;@13f1dc40
[Lscala.Tuple2;@6bb2f7fa
[Lscala.Tuple2;@32b67553
[Lscala.Tuple2;@68d0b627
[Lscala.Tuple2;@8493285

How can i solve this?

Further reading

What I want to achieve is to calculate the amount Wordsthat is found in Textfor a specific one Title.

The following code I wrote:

val wordcountperfile = file.map(line => (line.split("\t")(1).split(" ").flatMap(word => word), line.split("\t")(0))).map(word => (word, 1)).reduceByKey(_ + _)

But that will not work. Please feel free to submit your materials on this subject. Thank!

+4
source share
4 answers

... RDD. , scala.

val fileRdd = sc.textFile("s3n://file.txt")
// RDD[ String ]

val splitRdd = fileRdd.map( line => line.split("\t") )
// RDD[ Array[ String ]

val yourRdd = splitRdd.flatMap( arr => {
  val title = arr( 0 )
  val text = arr( 1 )
  val words = text.split( " " )
  words.map( word => ( word, title ) )
} )
// RDD[ ( String, String ) ]

// Now, if you want to print this...
yourRdd.foreach( { case ( word, title ) => println( s"{ $word, $title }" ) } )

// if you want to count ( this count is for non-unique words), 
val countRdd = yourRdd
  .groupBy( { case ( word, title ) => title } )  // group by title
  .map( { case ( title, iter ) => ( title, iter.size ) } ) // count for every title
+7

API- DataFrame. , "\ t" :

val df = spark.read
  .option("delimiter", "\t")
  .option("header", false)
  .csv("s3n://file.txt")
  .toDF("title", "text")

split text explode .

val df2 = df.select($"title", explode(split($"text", " ")).as("words"))

, title .

val countDf = df2.groupBy($"title").agg(count($"words"))
+2

API DataFrame

// read into DataFrame
val viewsDF=spark.read.text("s3n://file.txt")

// Split
val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\\t").getItem(0)).withColumn("col2", split($"value", "\\s+").getItem(1)).drop($"value"))

scala> val viewsDF=spark.read.text("spark-labs/data/wiki-pageviews.txt")
viewsDF: org.apache.spark.sql.DataFrame = [value: string]

scala> viewsDF.printSchema
root
 |-- value: string (nullable = true)


scala> viewsDF.limit(5).show
+------------------+
|             value|
+------------------+
|  aa Main_Page 3 0|
|  aa Main_page 1 0|
|  aa User:Savh 1 0|
|  aa Wikipedia 1 0|
|aa.b User:Savh 1 0|
+------------------+


scala> val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\\s+").getItem(0)).withColumn("col2", split($"value", "\\s+").getItem(1)).withColumn("col3", split($"value", "\\s+").getItem(2)).drop($"value")
splitedViewsDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]

scala>

scala> splitedViewsDF.printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)


scala> splitedViewsDF.limit(5).show
+----+---------+----+
|col1|     col2|col3|
+----+---------+----+
|  aa|Main_Page|   3|
|  aa|Main_page|   1|
|  aa|User:Savh|   1|
|  aa|Wikipedia|   1|
|aa.b|User:Savh|   1|
+----+---------+----+


scala>
0

. .map( line => line.split("\t") ) :

org.apache.spark.SparkException: the job was interrupted due to a failure of the step: task 0 in step 18.0 was not completed 4 times, last failure: lost job 0.3 in step 18.0 (TID 1485, ip-172-31-113-181.us - west-2.compute.internal, executor 10): java.lang.RuntimeException: coding error: java.lang.ArrayIndexOutOfBoundsException: 14

if the last column is empty. the best result is explained here - divide 1 column into 3 columns in spark rocks

0
source

Source: https://habr.com/ru/post/1584230/


All Articles