:
from pyspark.sql import functions as F
df = spark.createDataFrame([
(2.1729247374294496, 3.558069532647046, 6.607603368496324, 1),
(0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0),
(0.4253301781296708, 3.4566490739823483, 0.11711202266039554, 3),
(2.608497168338446, 3.529397129549324, 0.373034222141551, 2)
], ("col1", "col2", "col3", "x"))
min1, max1, max3 = df.select(F.min("col1"), F.max("col1"), F.max("col3")).first()
when
:
y = (F.when(F.col("col3") == max3, "K")
.when(F.col("col1") == max1, "Z")
.when(F.col("col1") == min1, "U")
.otherwise("I"))
df_with_y = df.withColumn("y", y)
df_with_y.show()
Y , X
df_with_y.select("x", "Y").join(df2, ["x"])
y
:
df_ = spark.createDataFrame([
(2.1729247374294496, 3.558069532647046, 6.607603368496324, 1, "G"),
(0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0, None),
(0.4253301781296708, 3.4566490739823483, 0.11711202266039554, 3, None),
(2.608497168338446, 3.529397129549324, 0.373034222141551, 2, None)
], ("col1", "col2", "col3", "x", "y"))
min1_, max1_, max3_ = df.filter(F.col("y").isNull()).select(F.min("col1"), F.max("col1"), F.max("col3")).first()
y_ = (F.when(F.col("col3") == max3_, "K")
.when(F.col("col1") == max1_, "Z")
.when(F.col("col1") == min1_, "U")
.otherwise("I"))
df_.withColumn("y", F.coalesce(F.col("y"), y_)).show()
, :
threshold = 0.0000001
y_t = (F.when(F.abs(F.col("col3") - max3) < threshold, "K")
.when(F.abs(F.col("col1") - max1) < threshold, "Z")
.when(F.abs(F.col("col1") - min1) < threshold, "U")
.otherwise("I"))
df.withColumn("y", y_t).show()