You can solve this problem without using Python code on the user side and switching to RDD. First of all, since you are using the ISO 8601 line, your data can be directly added to the date or time stamp:
from pyspark.sql.functions import col df = sc.parallelize([ ('2015-07-02T11:22:21.050Z', ), ('2016-03-20T21:00:00.000Z', ) ]).toDF(("d_str", )) df_casted = df.select("*", col("d_str").cast("date").alias("dt"), col("d_str").cast("timestamp").alias("ts"))
This will save one journey between the JVM and Python. There are also several ways that you can approach the second part. Date only:
from pyspark.sql.functions import current_date, datediff, unix_timestamp df_casted.where(datediff(current_date(), col("dt")) < 7)
Timestamps:
def days(i: int) -> int: return 60 * 60 * 24 * i df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))
You can also take a look at current_timestamp
and date_sub
Note I would not use DataFrame.map
. Instead, it is better to use DataFrame.rdd.map
. This will save you some work when switching to 2.0+
source share