数据流批处理作业卡在GroupByKey.create()中 [英] Dataflow Batch Job Stuck in GroupByKey.create()

查看:70
本文介绍了数据流批处理作业卡在GroupByKey.create()中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个批处理数据流管道,该管道已对我们的数据子集多次运行而没有出现问题,大约有15万行输入.我现在尝试在大约3亿行的完整数据集上运行.流水线的关键部分执行输入记录的GroupByKey,导致(我相信)约100M个键.

I have a batch dataflow pipeline that I have ran without issue many times on a subset of our data, with approximately 150k rows of input. I have now attempted to run on our full dataset of approximately 300M rows. A crucial part of the pipeline performs a GroupByKey of the input records resulting in (I believe) ~100M keys.

管道的相关部分如下所示:

The relevant part of the pipeline looks like this:

// Results in map of ~10k elements, 160MB in size
PCollectionView<Map<String, Iterable<Z>>> sideData = ...

...

.apply(ParDo.named("Group into KV").of(
    new DoFn<T, KV<String, T>>() { ... }
))
.apply("GBK", GroupByKey.create())
.apply(ParDo.named("Merge Values").withSideInputs(sideData).of(
    new DoFn<KV<String, Iterable<T>>, V>() { ... }
))

我已经两次运行该管道,并且每次运行正常运行超过16小时后都停止了工作.我第一次使用10 n1-highmem-8运行它,第二次使用6 n1-highmem-16实例运行.

Two times I have run this pipeline and each time the job has stalled after running fine for more than 16 hours. The first time I ran it using 10 n1-highmem-8 and the second time using 6 n1-highmem-16 instances.

我可以从Dataflow作业控制台中得知Group into KV ParDo可以很好地完成,并输出101,730,100个元素,大小为153.67 GB. GBK转换的步骤详细信息表明,在第一次和第二次尝试中分别添加了72,091,846和72,495,353个元素.此时,GBK转换仍处于运行阶段,但是所有计算机上的CPU均降为零,并且管道实际上停滞了.管道中的所有以后阶段都停止增加元素计数.我可以进入机器查看/var/log/dataflow/下的各种日志,但是似乎没有什么异常.云控制台中没有错误,GC日志似乎也没有指示内存问题.

I can tell from the Dataflow job console that the Group into KV ParDo completes just fine and outputs 101,730,100 elements with a size of 153.67 GB. The step detail for the GBK transform says that there were 72,091,846 and 72,495,353 elements added in the first and second attempts, respectively. At this point the GBK transform is still in the running phase, but the CPU on all the machines drops to zero and the pipeline is effectively stalled. All future stages in the pipeline all stop incrementing element counts. I can ssh into the machines to look at the various logs under /var/log/dataflow/ but there doesn't appear to be anything out of the ordinary. No errors in the cloud console and the GC logs don't seem to indicate memory issues.

在这一点上,我有点茫然,不知道下一步该怎么做.我已经读到,使用Combiner而不是使用GroupByKey可以产生更好的可伸缩性.只需进行一些重构,我就可以使代码可交换,从而可以选择Combiner.每次尝试运行此管道时,我都有些犹豫,这使我浪费了约250美元的云计算时间.

At this point I'm at a bit of a loss to know what to do next. I have read that using a Combiner instead of using the GroupByKey could yield better scalability. And with a little refactoring I could make it so that the code is commutative so that the Combiner would be an option. I'm somewhat hesitant to attempt that as each time I have tried to run this pipeline it has cost me ~$250 in wasted cloud compute time.

我的问题是:

  • 推荐什么方法来试图弄清管道似乎停滞时的行为?我应该在Java进程上执行kill -QUIT <pid>以获得堆栈跟踪,如果是的话,它将去哪里?

  • What are the recommended ways to try to figure out what the pipeline is doing when it appears to be stalled? Should I do a kill -QUIT <pid> on the java process to get a stack trace, and if so, where would it go?

有人对为什么该管道突然停止而没有任何错误或警告有任何理论吗?

Does anyone have any theories about why this pipeline would suddenly stall without any errors or warnings?

上述工作的IDS:

  • 2016-09-02_10_56_46-679417604641749922
  • 2016-09-03_15_52_32-6019142684328835835

推荐答案

GroupByKey之后,似乎一个工人可能被卡住或需要很长时间来运行DoFn代码.造成这种情况的最可能原因是热键"(比其他键具有更多的值).您可以在DoFn中添加一个Aggregator,并在运行时报告Iterable的大小,如下所示:

It looks like one worker may be either stuck or taking a long time to run the DoFn code after the GroupByKey. The most likely cause of this is a "hot key" (having significantly more values than other keys). You could add an Aggregator to the DoFn and report the size of the Iterable while running, something like this:

private static class MyDoFn extends KV<String, Iterable<T>>, V> {

  private static final Logger LOG =
    LoggerFactory.getLogger(FilterTextFn.class);
  private final Aggregator<Long, Long> maxIterableSize =
      createAggregator("maxIterableSize", new Max.MaxLongFn());

  @Override
  public void processElement(ProcessContext c) {
    long numElements = 0;
    for (T value : c.element().getValue()) {
      maxIterableSize.addValue(numElements++);
      if (numElements == 100) {
        LOG.warning("Key {} has > 100 values", c.element().getKey());
      }
      ... // the rest of your code inside the loop
    }
  }
}

上面的代码将添加一个计数器,以显示单个键上元素的最大数量,并向Cloud Logging报告任何具有超过100个值的键(在合理的情况下可以随意调整阈值-单个热键可能比其他任何键都有许多个元素.

The above will add a counter showing the maximum number of elements on a single key, and also report to Cloud Logging any key that has more than 100 values (feel free to adjust the threshold as seems reasonable -- the single hot key likely has many more elements than any other key).

另一种可能性是该DoFn的代码中某些内容在某些特定数据集上挂起或运行缓慢.您可以尝试连接到正在处理此项目的工作程序,并查看它的工作情况(如前所述,使用kill -QUIT <pid>).

The other possibility is that there is something in the code for that DoFn that is either hanging or really slow on some specific set of data. You could try connecting to the worker that is processing this one item and seeing what it is working on (using kill -QUIT <pid> as you mentioned).

这篇关于数据流批处理作业卡在GroupByKey.create()中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆