I have the following input (in the Park) for the spark job:
Person (millions of rows) +---------+----------+---------------+---------------+ | name | location | start | end | +---------+----------+---------------+---------------+ | Person1 | 1230 | 1478630000001 | 1478630000010 | | Person2 | 1230 | 1478630000002 | 1478630000012 | | Person2 | 1230 | 1478630000013 | 1478630000020 | | Person3 | 3450 | 1478630000001 | 1478630000015 | +---------+----------+---------------+---------------+ Event (millions of rows) +----------+----------+---------------+ | event | location | start_time | +----------+----------+---------------+ | Biking | 1230 | 1478630000005 | | Skating | 1230 | 1478630000014 | | Baseball | 3450 | 1478630000015 | +----------+----------+---------------+
and I need to convert it to the following expected result:
[{ "name" : "Biking", "persons" : ["Person1", "Person2"] }, { "name" : "Skating", "persons" : ["Person2"] }, { "name" : "Baseball", "persons" : ["Person3"] }]
In words: the result is a list of each event, each of which has a list of persons who participated in this event.
A person is considered a participant if
Person.start < Event.start_time && Person.end > Event.start_time && Person.location == Event.location
I tried different approaches, but the only thing that actually works is joining two data files and then grouping them / grouping by event. But the connection is very slow and does not apply to multiple processor cores.
Current connection code:
final DataFrame fullFrame = persons.as("persons") .join(events.as("events"), col("persons.location").equalTo(col("events.location")) .and(col("events.start_time").geq(col("persons.start"))) .and(col("events.start_time").leq(col("persons.end"))), "inner"); //count to have an action fullFrame.count();
I use Spark Standalone and Java, if that matters.
Does anyone have an idea how to solve this problem with Spark 1.6.2?