You are doing everything you miss correctly, over(window expression) on lag
val df = sc.parallelize(Seq((201601, 100.5), (201602, 120.6), (201603, 450.2), (201604, 200.7), (201605, 121.4))).toDF("date", "volume") val w = org.apache.spark.sql.expressions.Window.orderBy("date") import org.apache.spark.sql.functions.lag val leadDf = df.withColumn("new_col", lag("volume", 1, 0).over(w)) leadDf.show() +------+------+-------+ | date|volume|new_col| +------+------+-------+ |201601| 100.5| 0.0| |201602| 120.6| 100.5| |201603| 450.2| 120.6| |201604| 200.7| 450.2| |201605| 121.4| 200.7| +------+------+-------+
This code was run on Spark shell 2.0.2
source share