You cannot do it beautifully, I'm afraid. Think about how it works under the hood - it breaks the data that needs to be converted into pieces, and sends it to different processes, each process counts its piece, and then one reducer adds them at the end. Although each process counts, it does not know the whole size, so it cannot add a field. The only way is to go back and add it to the data as soon as the whole size (i.e. Connection) is known.
If each group fits in memory (and you can configure the memory), you can:
Tsv(args("input"), ('id1, 'id2)) .groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list)) .flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) { case (list, size) => list.map(record => (record._1, record._2, size)) } .write(Tsv(args("output")))
But if your system does not have enough memory, you will have to use an expensive connection.
Note: You can use Tsv instead of TextLine, followed by mapTo and the split.
source share