How to add a new column to a Spark DataFrame (using PySpark)?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.

I tried the following without any success:

type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) 

Also received an error:

 my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 

So, how do I add a new column (based on a Python vector) to an existing DataFrame with PySpark?

+96
python dataframe apache-spark pyspark apache-spark-sql spark-dataframe
Nov 12 '15 at 21:14
source share
7 answers

You cannot add an arbitrary column to a DataFrame in Spark. New columns can only be created using literals (other types of literals are described in How to add a constant column to a Spark DataFrame? )

 from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) df_with_x4 = df.withColumn("x4", lit(0)) df_with_x4.show() ## +---+---+-----+---+ ## | x1| x2| x3| x4| ## +---+---+-----+---+ ## | 1| a| 23.0| 0| ## | 3| B|-23.0| 0| ## +---+---+-----+---+ 

transform existing column:

 from pyspark.sql.functions import exp df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) df_with_x5.show() ## +---+---+-----+---+--------------------+ ## | x1| x2| x3| x4| x5| ## +---+---+-----+---+--------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| ## | 3| B|-23.0| 0|1.026187963170189...| ## +---+---+-----+---+--------------------+ 

enabled using join :

 from pyspark.sql.functions import exp lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) df_with_x6 = (df_with_x5 .join(lookup, col("x1") == col("k"), "leftouter") .drop("k") .withColumnRenamed("v", "x6")) ## +---+---+-----+---+--------------------+----+ ## | x1| x2| x3| x4| x5| x6| ## +---+---+-----+---+--------------------+----+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo| ## | 3| B|-23.0| 0|1.026187963170189...|null| ## +---+---+-----+---+--------------------+----+ 

or generated with the / udf function:

 from pyspark.sql.functions import rand df_with_x7 = df_with_x6.withColumn("x7", rand()) df_with_x7.show() ## +---+---+-----+---+--------------------+----+-------------------+ ## | x1| x2| x3| x4| x5| x6| x7| ## +---+---+-----+---+--------------------+----+-------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| ## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| ## +---+---+-----+---+--------------------+----+-------------------+ 

Functional built-in functions ( pyspark.sql.functions ) that match the Catalyst expression are usually preferable to functions defined by the Python user.

If you want to add the contents of an arbitrary RDD as a column, you can

  • add line numbers to an existing data frame
  • calling zipWithIndex in RDD and converting it to a data frame
  • join an index as a join key
+155
Nov 12 '15 at 23:37
source share

To add a column using UDF:

 df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) from pyspark.sql.functions import udf from pyspark.sql.types import * def valueToCategory(value): if value == 1: return 'cat1' elif value == 2: return 'cat2' ... else: return 'n/a' # NOTE: it seems that calls to udf() must be after SparkContext() is called udfValueToCategory = udf(valueToCategory, StringType()) df_with_cat = df.withColumn("category", udfValueToCategory("x1")) df_with_cat.show() ## +---+---+-----+---------+ ## | x1| x2| x3| category| ## +---+---+-----+---------+ ## | 1| a| 23.0| cat1| ## | 3| B|-23.0| n/a| ## +---+---+-----+---------+ 
+50
May 16 '16 at 10:04
source share

For Spark 2.0

 # assumes schema has 'age' column df.select('*', (df.age + 10).alias('agePlusTen')) 
+25
Aug 31 '16 at 21:18
source share

I would like to offer a generalized example for a very similar use case:

Use case: I have a CSV consisting of:

 First|Third|Fifth data|data|data data|data|data ...billion more lines 

I need to perform some conversions, and the final CSV should look like this

 First|Second|Third|Fourth|Fifth data|null|data|null|data data|null|data|null|data ...billion more lines 

I have to do this because it is a schema defined by some model, and I need my final data to be compatible with SQL Bulk Inserts and such things.

So:

1) I read the original csv using spark.read and call it "df".

2) I am doing something with the data.

3) I add null columns using this script:

 outcols = [] for column in MY_COLUMN_LIST: if column in df.columns: outcols.append(column) else: outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column))) df = df.select(outcols) 

This way you can structure your schema after loading CSV (will also work for reordering columns if you have to do this for many tables).

0
Mar 02 '18 at 15:10
source share

The easiest way to add a column is to use withColumn. Since the data frame is created using sqlContext, you must specify the schema, or by default it may be available in the data set. If a diagram is specified, the workload becomes tedious with every change.

Below is an example that you can consider:

 from pyspark.sql import SQLContext from pyspark.sql.types import * sqlContext = SQLContext(sc) # SparkContext will be sc by default # Read the dataset of your choice (Already loaded with schema) Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter") # For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following: Data = Data.withColumn("col31", "Code goes here") # Check the change Data.printSchema() 
0
Jun 12 '19 at 5:03
source share

You can define a new udf by adding column_name :

 u_f = F.udf(lambda :yourstring,StringType()) a.select(u_f().alias('column_name') 
-one
Dec 27 '16 at 8:42 on
source share
 from pyspark.sql.functions import udf from pyspark.sql.types import * func_name = udf( lambda val: val, # do sth to val StringType() ) df.withColumn('new_col', func_name(df.old_col)) 
-one
May 03 '17 at 10:09 a.m.
source share



All Articles