How to add or replace fields in a structure at any nested level?
This input:
val rdd = sc.parallelize(Seq( """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""", """{"a": {"xX": 3},"b": {"z": 0}}""", """{"a": {"XX": 3},"b": {"z": 0}}""", """{"a": {"xx": 4},"b": {"z": 0}}""")) var df = sqlContext.read.json(rdd)
The following scheme yields:
root |-- a: struct (nullable = true) | |-- XX: long (nullable = true) | |-- xX: long (nullable = true) | |-- xx: long (nullable = true) |-- b: struct (nullable = true) | |-- z: long (nullable = true)
Then I can do it:
import org.apache.spark.sql.functions._ val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) df = df .withColumn("a_xx", coalesce(overlappingNames:_*)) .dropNestedColumn("a.xX") .dropNestedColumn("a.XX") .dropNestedColumn("a.xx")
( dropNestedColumn
borrowed from this answer: https://stackoverflow.com/a/4646262/) . I basically look for the inverse operation.)
And the circuit becomes:
root |-- a: struct (nullable = false) |-- b: struct (nullable = true) | |-- z: long (nullable = true) |-- a_xx: long (nullable = true)
Obviously, it does not replace (or add) a.xx
, but instead adds a new field a_xx
at the root level.
I would like to do this instead:
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX")) df = df .withNestedColumn("a.xx", coalesce(overlappingNames:_*)) .dropNestedColumn("a.xX") .dropNestedColumn("a.XX")
In order for this to lead to this scheme:
root |-- a: struct (nullable = false) | |-- xx: long (nullable = true) |-- b: struct (nullable = true) | |-- z: long (nullable = true)
How can i achieve this?
The practical goal here is case insensitive with column names in input JSON. The last step would be simple: collect all the overlapping column names and apply coalescence on each.