Spark - connect one to many channels of relationship data

Take the following toy, I have the following class classes:

case class Order(id: String, name: String, status: String)
case class TruncatedOrder(id: String)
case class Org(name: String, ord: Seq[TruncatedOrder])

I now have the following specific variables

val ordersDF = Seq(Order("or1", "stuff", "shipped"), Order("or2", "thigns", "delivered") , Order("or3", "thingamabobs", "never received"), Order("or4", "???", "what?")).toDS()
val orgsDF = Seq(Org("tupper", Seq(TruncatedOrder("or1"), TruncatedOrder("or2"), TruncatedOrder("or3"))), Org("ware", Seq(TruncatedOrder("or3"), TruncatedOrder("or4")))).toDS()  

I would like to have, for example, a datapoint that looks like this: Ord("tupper", Array(Joined("or1", "stuff", "shipped"), Joined("or2", "things", "delivered"), ...)

I am wondering how to format my joinfilter statements and statements.

+4
source share
3 answers

This is how I was able to get the data in the format I wanted. This answer is largely inspired by @ulrich and @Mariusz.

val ud = udf((col: String, name: String, status: String) => { Seq(col, name, status)})

orgsDF
  .select($"name".as("ordName"),explode($"ord.id"))
  .join(ordersDF, $"col" === $"id").drop($"id")
  .select($"ordName", ud($"col", $"name", $"status"))
  .groupBy($"ordName")
  .agg(collect_set($"order"))
  .show()

    +-------+--------------------------------------------------------------------------------------------------------------------------+
    |ordName|orders                                                                                                                    |
    +-------+--------------------------------------------------------------------------------------------------------------------------+
    |ware   |[WrappedArray(or4, ???, what?), WrappedArray(or3, thingamabobs, never received)]                                          |
    |tupper |[WrappedArray(or1, stuff, shipped), WrappedArray(or2, thigns, delivered), WrappedArray(or3, thingamabobs, never received)]|
    +-------+--------------------------------------------------------------------------------------------------------------------------+
+2
source

One of many is easy to record if you follow these steps:

0

How about this?

spark.conf.set("HiveSupport.enabled", true)

orgsDF.select('name,explode('ord))
      .map {case row: Row =>(row(0).toString,row(1).toString.filterNot("[]()".contains(_))) }.toDF("name",("ord"))
      .join(ordersDF.select('id,'status,'name.as("name2") ),'ord === 'id).drop("id")
      .select('name,concat('ord, lit(","),'Status, lit(","),'name2 ).as("info"))
      .groupBy('name) 
      .agg(collect_set('info))
      .show()

What returns

+------+--------------------+
|  name|   collect_set(info)|
+------+--------------------+
|  ware|[[or3,never recei...|
|tupper|[[or1,shipped,stu...|
+------+--------------------+
0
source

Source: https://habr.com/ru/post/1660131/


All Articles