Scalping, smoothing fields after groupBy

I see the following: Scalping: how to save another field after the groupBy (') {field. size}?

This is a real pain and mess compared to Apache Pig ... What am I doing wrong? Can I do the same as a GENERATE (FLATTEN ()) pig?

I'm confused. Here is my firing code:

def takeTop(topAmount: Int) :Pipe = self .groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)} .flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount)) 

And my test:

  "Take top 3" should "return most active pairs" in { Given{ List( (1, 13, 7), (1, 13, 8), (1, 12, 9), (1, 11, 10), (2, 20, 21), (2, 20, 22)) withSchema (person1, person2, activityCount) } When { pipe:RichPipe => pipe.takeTop(3) } Then { buffer: mutable.Buffer[(Long, Long, Long)] => println(buffer.toList) buffer.toList.size should equal(5) println (buffer.toList) buffer.toList should contain (1, 11, 10) buffer.toList should contain (1, 12, 9) buffer.toList should contain (1, 13, 8) buffer.toList should not contain (1, 13, 7) buffer.toList should contain (2, 20, 21) buffer.toList should contain (2, 20, 22) } } 

And I get an exception at runtime:

 14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107) at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39) at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47) at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67) at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145) at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133) at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321) at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151) at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39) at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51) at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28) at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113) at cascading.flow.stream.Duct.complete(Duct.java:81) at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296) at cascading.flow.stream.Duct.complete(Duct.java:81) at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296) at cascading.flow.stream.SourceStage.map(SourceStage.java:105) at cascading.flow.stream.SourceStage.call(SourceStage.java:53) at cascading.flow.stream.SourceStage.call(SourceStage.java:38) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3 at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669) at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47) at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46) at scala.collection.immutable.List.foreach(List.scala:318) at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46) at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99) ... 23 more 

What am I doing wrong?

UPD:

I did it as follows:

  def takeTop(topAmount: Int) :Pipe = self .groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)} .flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2)) .project(person1, person2, activityCount) 

The test passes, but I'm not sure if this is a good approach ...

+2
source share
1 answer
 def takeTop(topAmount: Int) :Pipe = self .groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)} .flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2)) .project(person1, person2, activityCount) 

Works, did not find a better approach

0
source

Source: https://habr.com/ru/post/948867/


All Articles