You can make an independent connection. First get the groups:
val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3")
Then you can join this source DataFrame file:
val joinedDF = groups .select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3) .join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3")
As long as you get exactly the same data that you originally had (and with three additional redundant columns), you can make another connection to add a column with the MongoDB document ID for the group (col1, col2, col3) associated with the row.
Anyway, in my experience, the ways of handling complex things in DataFrames are joining and joining.
source share