Combining two DataFrames from one source

I am using the DataFrame API for pyspark (Apache Spark) and I ran into the following problem:

When I join two DataFrames that come from the same source DataFrame, the resulting DF will explode into a huge number of rows. Quick example:

I load a DataFrame with n lines from disk:

 df = sql_context.parquetFile('data.parquet') 

Then I create two DataFrames from this source.

 df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') 

Finally, I want to (internally) merge them together:

 df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner') 

The key in col1 is unique. The resulting DataFrame must have rows n , however, it has rows n*n .

This does not happen when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this is also happening in the current snapshot 1.4.0.

Can someone explain why this is happening?

+6
source share
2 answers

If I read this correctly, df_two does not have col2

  df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') 

So when you do this:

  df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') 

This should fail. If you wanted to say

  df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner') 

However, the fact that you are loading from a single data frame should not have any effect. I would advise you:

  df_one.show() df_two.show() 

To make sure the data you select is expected.

+3
source

I see this problem in my large dataset, on Spark 1.3. Unfortunately, in small, far-fetched examples, I correctly formulated "connection". I feel that there are some basic mistakes from the steps leading up to the connection, perhaps

Making a connection (Note: DateTime is just a string):

 > join = df1.join(df2, df1.DateTime == df2.DateTime, "inner") > join.count() 250000L 

This obviously returns a full 500 * 500 Cartesian join.

What works for me is switching to SQL:

  > sqlc.registerDataFrameAsTable(df1, "df1") > sqlc.registerDataFrameAsTable(df2, "df2") > join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime") > join.count() 471L 

This value looks correct.

Seeing this, I personally will not use pyspark DataFrame.join () until I can better understand this difference.

0
source

Source: https://habr.com/ru/post/985616/


All Articles