Spark dataframe merges with a range of slow

I have the following input (in the Park) for the spark job:

Person (millions of rows) +---------+----------+---------------+---------------+ | name | location | start | end | +---------+----------+---------------+---------------+ | Person1 | 1230 | 1478630000001 | 1478630000010 | | Person2 | 1230 | 1478630000002 | 1478630000012 | | Person2 | 1230 | 1478630000013 | 1478630000020 | | Person3 | 3450 | 1478630000001 | 1478630000015 | +---------+----------+---------------+---------------+ Event (millions of rows) +----------+----------+---------------+ | event | location | start_time | +----------+----------+---------------+ | Biking | 1230 | 1478630000005 | | Skating | 1230 | 1478630000014 | | Baseball | 3450 | 1478630000015 | +----------+----------+---------------+ 

and I need to convert it to the following expected result:

 [{ "name" : "Biking", "persons" : ["Person1", "Person2"] }, { "name" : "Skating", "persons" : ["Person2"] }, { "name" : "Baseball", "persons" : ["Person3"] }] 

In words: the result is a list of each event, each of which has a list of persons who participated in this event.

A person is considered a participant if

 Person.start < Event.start_time && Person.end > Event.start_time && Person.location == Event.location 

I tried different approaches, but the only thing that actually works is joining two data files and then grouping them / grouping by event. But the connection is very slow and does not apply to multiple processor cores.

Current connection code:

 final DataFrame fullFrame = persons.as("persons") .join(events.as("events"), col("persons.location").equalTo(col("events.location")) .and(col("events.start_time").geq(col("persons.start"))) .and(col("events.start_time").leq(col("persons.end"))), "inner"); //count to have an action fullFrame.count(); 

I use Spark Standalone and Java, if that matters.

Does anyone have an idea how to solve this problem with Spark 1.6.2?

+5
source share
1 answer

Range connections are made as a cross product followed by a filter step. A potentially better solution would be a broadcast potentially smaller events table, and then display the persons : table inside the map, check the connection condition and get the corresponding result.

+1
source

Source: https://habr.com/ru/post/1259478/


All Articles