Add column to Data Frame in Apache Spark 1.3

Is it possible, and what would be the most effective neat method for adding a column to a Data Frame?

More specifically, a column can serve as row identifiers for an existing data frame.

In a simplified case, while reading a file and not tokenizing it, I can think of something as shown below (in Scala), but it fails with errors (on line 3), and in any case this does not seem like the best route is possible :

var dataDF = sc.textFile("path/file").toDF() val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") dataDF = dataDF.withColumn("ID", rowDF("ID")) 
+43
scala dataframe apache-spark
Apr 07 '15 at 3:59
source share
4 answers

Some time has passed since I posted the question, and it seems that some other people would also like to get an answer. Below I have found.

Thus, the initial task was to add a column with row identifiers (basically a 1 to numRows sequence) to any data frame, so the order / presence of rows can be monitored (for example, when selected). This can be achieved by something in this direction:

 sqlContext.textFile(file). zipWithIndex(). map(case(d, i)=>i.toString + delimiter + d). map(_.split(delimiter)). map(s=>Row.fromSeq(s.toSeq)) 

Regarding the general case of adding any column to any data frame:

The "closest" to this functionality in the Spark API are withColumn and withColumnRenamed . According to Scala docs , the former returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can only work with this data frame, i.e. Given two data frames df1 and df2 with col column:

 val df = df1.withColumn("newCol", df1("col") + 1) // -- OK val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL 

Therefore, if you cannot convert a column of an existing data frame to the form you need, you cannot use withColumn or withColumnRenamed to add arbitrary columns (stand-alone or other data frames).

As noted above, a workaround might be to use join - this would be rather messy, although it is possible - adding unique keys, like the one above, using zipWithIndex to both data frames or columns, may work. Although the effectiveness ...

It is clear that adding a column to the data frame is not simple functionality for a distributed environment, and there may not be a very efficient, neat method for this. But I think it's still very important to have this basic functionality, even with performance warnings.

+44
Apr 29 '15 at 17:43 on
source share

Not sure if it works in spark 1.3, but in spark 1.5 I use withColumn:

 import sqlContext.implicits._ import org.apache.spark.sql.functions._ df.withColumn("newName",lit("newValue")) 

I use this when I need to use a value that is not related to existing columns of a data block

This is similar to @NehaM's answer, but easier

+23
May 31 '16 at 15:57
source share

I got help from the answer from above. However, I believe that it is incomplete if we want to change the DataFrame , and the existing APIs are slightly different from Spark 1.6 . zipWithIndex() returns a Tuple of (Row, Long) , which contains each row and its corresponding index. We can use it to create a new Row according to our need.

 val rdd = df.rdd.zipWithIndex() .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq)) val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields)) sqlContext.createDataFrame(rdd, newstructure ).show 

I hope this will be helpful.

+6
May 2 '16 at 11:45
source share

You can use row_number with the Window function , as shown below, to get a separate identifier for each row in the data frame.

 df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe")) 

You can also use monotonically_increasing_id for the same as

 df.withColumn("ID", monotonically_increasing_id()) 

And there are other ways .

+1
Jul 16 '17 at 2:11
source share



All Articles