分组后对字段进行缩放,展平 [英] Scalding, flatten fields after groupBy

查看:124
本文介绍了分组后对字段进行缩放,展平的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我看到了: 缩放:如何保留另一个字段,在groupBy('field){.size}之后?

与Apache Pig相比,这真是一种痛苦和混乱……我该怎么办?我可以像 GENERATE(FLATTEN())猪一样做吗?

it's a real pain and a mess comparing to Apache Pig... What do I do wrong? Can I do the same like GENERATE(FLATTEN()) pig?

我很困惑.这是我的烫手代码:

I'm confused. Here is my scalding code:

  def takeTop(topAmount: Int) :Pipe = self
    .groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)}
    .flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount))

我的测试:

  "Take top 3" should "return most active pairs" in {
    Given{
      List( (1, 13, 7),
            (1, 13, 8),
            (1, 12, 9),
            (1, 11, 10),
            (2, 20, 21),
            (2, 20, 22)) withSchema (person1, person2, activityCount)
    } When {
      pipe:RichPipe => pipe.takeTop(3)
    } Then {
      buffer: mutable.Buffer[(Long, Long, Long)] =>
      println(buffer.toList)
      buffer.toList.size should equal(5)
      println (buffer.toList)

      buffer.toList should contain (1, 11, 10)
      buffer.toList should contain (1, 12, 9)
      buffer.toList should contain (1, 13, 8)
      buffer.toList should not contain (1, 13, 7)

      buffer.toList should contain (2, 20, 21)
      buffer.toList should contain (2, 20, 22)
  }
  }

我在运行时确实遇到了异常:

And I do get an exception in runtime:

14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
    at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
    at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321)
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
    at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3
    at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 23 more

我该怎么办?

UPD:

我是这样做的:

 def takeTop(topAmount: Int) :Pipe = self
    .groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
    .flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
    .project(person1, person2, activityCount)

测试通过了,但是我不确定这是个好方法...

Test passes, but I'm not sure that it's good approach...

推荐答案

def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
.project(person1, person2, activityCount)

有效,找不到更好的方法

Works, didn't find better approach

这篇关于分组后对字段进行缩放,展平的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆