Filtering DataFrame candles for n-day window for data divided by day

Suppose that I have DataFrame entitled transactionsto the following integer columns: year, month, day, timestamp, transaction_id.

In [1]: transactions = ctx.createDataFrame([(2017, 12, 1, 10000, 1), (2017, 12, 2, 10001, 2), (2017, 12, 3, 10003, 3), (2017, 12, 4, 10004, 4), (2017, 12, 5, 10005, 5), (2017, 12, 6, 10006, 6)],('year', 'month', 'day', 'timestamp', 'transaction_id'))

In [2]: transactions.show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10000|             1|
|2017|   12|  2|    10001|             2|
|2017|   12|  3|    10003|             3|
|2017|   12|  4|    10004|             4|
|2017|   12|  5|    10005|             5|
|2017|   12|  6|    10006|             6|
+----+-----+---+---------+--------------+

I want to define a function filter_date_rangethat returns a DataFrame consisting of transaction rows falling into a certain date range.

>>> filter_date_range(  
        df = transactions, 
        start_date = datetime.date(2017, 12, 2), 
        end_date = datetime.date(2017, 12, 4)).show()

+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10001|             2|
|2017|   12|  1|    10003|             3|
|2017|   12|  1|    10004|             4|
+----+-----+---+---------+--------------+

Assuming that data is stored in the Hive section, divided into year, month, daywhich is the most effective way to accomplish such a filter, which includes the date arithmetic? I'm looking for a way to do this in a purely DataFrame-ic way, without resorting to use transactions.rdd, so Spark can conclude that you really need to read only a subset of partitions.

+4
1

:

.
β”œβ”€β”€ _SUCCESS
└── year=2017
    └── month=12
        β”œβ”€β”€ day=1
        β”‚   └── part-0...parquet
        β”œβ”€β”€ day=2
        β”‚   └── part-0...parquet
        β”œβ”€β”€ day=3
        β”‚   └── part-0...parquet
        β”œβ”€β”€ day=4
        β”‚   └── part-0...parquet
        β”œβ”€β”€ day=5
        β”‚   └── part-0...parquet
        └── day=6
            └── part-0...parquet

:

start_date = datetime.date(2017, 12, 2)
end_date = datetime.date(2017, 12, 4)
n = (end_date - start_date).days + 1

base_path = ...

paths = [
    "{}/year={}/month={}/day={}".format(base_path, d.year, d.month, d.day) 
    for d in [start_date + datetime.timedelta(days=i) for i in  range(n)]
]

spark.read.option("basePath", base_path).load(paths).explain()

# == Parsed Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Analyzed Logical Plan ==
# timestamp: bigint, transaction_id: bigint, year: int, month: int, day: int
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Optimized Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Physical Plan ==
# *FileScan parquet [timestamp#47L,transaction_id#48L,year#49,month#50,day#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/transactions/year=2017/month=12/day=2, file:/user/hiv..., PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:bigint,transaction_id:bigint>

:

0

Source: https://habr.com/ru/post/1692893/


All Articles