How to use PySpark to load a maximized window from daily files?

I have a large number of fairly large daily files stored in the blog storage module (S3, Azure datalake exc .. exc ​​..) data1900-01-01.csv, data1900-01-02.csv,....,data2017-04-27.csv. My goal is to transform N-day cyclic linear regression, but I am having problems with the data loading aspect. I am not sure how to do this without the nested RDD's. scheme the .csvsame for each file .

In other words, for each date d_tI need data x_tto combine the data (x_t-1, x_t-2,... x_t-N).

How can I use PySpark to load an N-day window of these daily files? All PySpark examples that I can find can be loaded from one very large file or dataset.

Here is an example of my current code:

dates = [('1995-01-03', '1995-01-04', '1995-01-05'), ('1995-01-04', '1995-01-05', '1995-01-06')]

p = sc.parallelize(dates)
def test_run(date_range):
    dt0 = date_range[-1] #get the latest date
    s = '/daily/data{}.csv'
    df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM')
    file_list = [s.format(dt) for dt in date_range[:-1]] # Get a window of trailing dates
    df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')
    return 1

p.filter(test_run) 

p.map(test_run) #fails with same error as p.filter

I'm on PySpark version '2.1.0'

I run this on a jupyter laptop for an Azure HDInsight laptop.

spark here is of type <class 'pyspark.sql.session.SparkSession'>

A more reproducible example is as follows:

p = sc.parallelize([1, 2, 3])
def foo(date_range):
    df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
    return 1
p.filter(foo).count()
+6
source share
1 answer

You better use Dataframes, not RDD. Dataframe read.csvapi accepts a list of paths such as -

pathList = ['/path/to/data1900-01-01.csv','/path/to/data1900-01-02.csv']
df = spark.read.csv(pathList)

see the documentation for read.csv

You can make a list of paths to date files to your data files by performing some date operation on the window for N days, for example "path/to/data"+datetime.today().strftime("%Y-%m-%d"))+.csv"(this will allow you to get the file name only today, but it’s not difficult to determine the calculation of the date for N days)

, csvs .

edit: i.e. p, , test_run2 , , 1995-01-01

, , .

# Get the list of dates 
date_range = window(dates, N) 
s = '/daily/data{}.csv'

dt0 = date_range[-1] # most recent file
df0 = spark.read.csv(s.format(dt0), header=True, mode='DROPMALFORM') 

# read previous files
file_list = [s.format(dt) for dt in date_range[:-1]]
df1 = spark.read.csv(file_list, header=True, mode='DROPMALFORM')

r, resid = computeLinearRegression(df0,df1)
r.write.save('daily/r{}.csv'.format(dt0))
resid.write.save('/daily/resid{}.csv'.format(dt0))
+2

Source: https://habr.com/ru/post/1016829/


All Articles