PySpark: filtering the DataFrame field by date in the range where date is a string

My data frames contain one field, which is a date, and it displays in string format, as an example

'2015-07-02T11:22:21.050Z' 

I need to filter a DataFrame in a date to only get records from the last week. So, I tried to use a map approach, where I converted string dates to datetime objects using strptime:

 def map_to_datetime(row): format_string = '%Y-%m-%dT%H:%M:%S.%fZ' row.date = datetime.strptime(row.date, format_string) df = df.map(map_to_datetime) 

and then I would apply the filter as

 df.filter(lambda row: row.date >= (datetime.today() - timedelta(days=7))) 

I manage to get the display, but the filter does not work with

TypeError: condition must be a row or column

Is there a way to use filtering in a way that works, or should I change the approach and how?

+10
source share
2 answers

You can solve this problem without using Python code on the user side and switching to RDD. First of all, since you are using the ISO 8601 line, your data can be directly added to the date or time stamp:

 from pyspark.sql.functions import col df = sc.parallelize([ ('2015-07-02T11:22:21.050Z', ), ('2016-03-20T21:00:00.000Z', ) ]).toDF(("d_str", )) df_casted = df.select("*", col("d_str").cast("date").alias("dt"), col("d_str").cast("timestamp").alias("ts")) 

This will save one journey between the JVM and Python. There are also several ways that you can approach the second part. Date only:

 from pyspark.sql.functions import current_date, datediff, unix_timestamp df_casted.where(datediff(current_date(), col("dt")) < 7) 

Timestamps:

 def days(i: int) -> int: return 60 * 60 * 24 * i df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7)) 

You can also take a look at current_timestamp and date_sub

Note I would not use DataFrame.map . Instead, it is better to use DataFrame.rdd.map . This will save you some work when switching to 2.0+

+8
source

I found a way to solve my problem using the SparkSQL API with dates in String format.

Here is an example:

 last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d') new_df = df.where(df.date >= last_week) 
+9
source

Source: https://habr.com/ru/post/1245423/


All Articles