Google云端数据流管道中的例外情况,从BigQuery到Cloud Bigtable [英] Exceptions in Google Cloud Dataflow pipelines from BigQuery to Cloud Bigtable

查看:164
本文介绍了Google云端数据流管道中的例外情况,从BigQuery到Cloud Bigtable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

执行DataFlow管道,我们每隔一段时间就会看到这些异常。我们能为他们做些什么吗?我们有一个非常简单的流程,它从BigQuery查询中读取数据并填充BigTable中的数据。

管道中的数据也会发生什么变化?它是否被重新处理?或者它在传输到BigTable时丢失了?

  CloudBigtableIO.initializeForWrite(p); 
p.apply(BigQueryIO.Read.fromQuery(getQuery()))
.apply(ParDo.of(newDoFn< TableRow,Mutation>(){
public void processElement(ProcessContext c ){
Mutation output = convertDataToRow(c.element());
c.output(output);
}

}))
。申请(CloudBigtableIO.writeToTable(配置));


private static Mutation convertDataToRow(TableRow element){
LOG.info(element:+ element);
LOG.info(BASM_segment_id:+ element.get(BASM_segment_id));
if(element.get(BASM_AID)!= null){
Put obj = new Put(getRowKey(element).getBytes())。addColumn(SEGMENT_FAMILY,SEGMENT_COLUMN_NAME,((String)element .get(BAS_category))。getBytes());
obj.addColumn(USER_FAMILY,AID​​.getBytes(),((String)element.get(BASM_AID))。getBytes());
if(element.get(BASM_segment_id)!= null){
obj.addColumn(SEGMENT_FAMILY,segment_id.getBytes(),((String)element.get(BASM_segment_id)) .getBytes()); $($)
$ b $ if(element.get BAS_sub_category))的getBytes())。 (元素.get(BAS_name)!= null){
obj.addColumn(SEGMENT_FAMILY,name.getBytes(),((String)element.get(
) BAS_name))的getBytes())。 (元素.get(BAS_description)!= null){
obj.addColumn(SEGMENT_FAMILY,description.getBytes(),((String)element.get(
) BAS_description))的getBytes())。
}
如果(element.get( BAS_last_compute_day)!= NULL){obj.addColumn(USER_FAMILY, Krux_User_id .getBytes(),((字符串)element.get( BASM_krux_user_id) ).getBytes());
obj.addColumn(SEGMENT_FAMILY,last_compute_day.getBytes(),((String)element.get(BAS_last_compute_day))。getBytes()); (元素.get(BAS_type)!= null){
obj.addColumn(SEGMENT_FAMILY,type.getBytes(),((String)element.get(
) BAS_type))的getBytes())。 (element.get(BASM_REGID)!= null){
obj.addColumn(USER_FAMILY,REGID.getBytes(),((String)element.get(
) BASM_REGID))。getBytes());
}
return obj;
} else {
return null;


$ / code $ / pre

以下是我们得到的例外情况: p>


2016-08-22T21:47:33.469Z:错误:(84707221e08b977b):java.lang.RuntimeException:com.google.cloud.dataflow .sdk.util.UserCodeExc
ption:org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:失败1行为:StatusRuntimeException:1次,
位于com.google.cloud.dataflow.sdk.runners。 worker.SimpleParDoFn $ 1.输出(SimpleParDoFn.java:162)
在com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
在com.google .cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnProcessContext.output(DoFnRunnerBase.java:449)
at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData $ 2.processElement(BigTableSegmentData.java:70)
导致:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop.hbase.client.RetriesExhauste dWithDetails除了
on:失败1动作:StatusRuntimeException:1次,
在com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35)
at com。 google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40)
,位于com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368)
com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138 )
,位于com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190)
位于com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn .processElement(ForwardingParDoFn.java:42)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoF ñ。
ava:47)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
位于com.google.cloud。 dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
,位于com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn $ 1.output(SimpleParDoFn.java:160)
处com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnProcessContext com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287)
。输出(DoFnRunnerBase.java:449)
。在com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData $ 2.processElement(BigTableSegmentData.java:70)
。在com.google.cloud.dataflow.sdk.util .SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
在com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138)
在com.google.cloud.dataflow .sdk.runners。 worker.impleParDoFn.processElement(SimpleParDoFn.java:190)
,位于com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
。 cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn。
ava:47)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
位于com.google.cloud。 data.co.uk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) )
在com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167)
在com.google.cloud.dataflow.sdk.util.common .worker.MapTaskExecutor.execute(MapTaskExecutor.java:71)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288)
,位于com.google .cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)
,位于com.google.cloud.dataflow.sdk.runners.worker.Data flowWorkerHarness $ WorkerThread.doWork(DataflowWorkerHarness.java:193)
在com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness.java:173)
。在COM。 google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness.java:160)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)维持在java.lang.Thread中java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)

。运行(Thread.java:745)


导致:


org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:失败1行为:StatusRuntimeException:1次,
位于com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java: 389)
在com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274)
在com.google.cloud.bigtable.dataflow.CloudBigtableIO $ CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtabl
IO。

从Dataflow控制台复制的异常

 (7e75740160102c05):java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:失败1操作:StatusRuntimeException:1次,位于com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnContext.outputWindowedValue(DoFnRunnerBase.java:com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn $ 1.output(SimpleParDoFn.java:162) 287)at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnProcessContext.output(DoFnRunnerBase.java:449)at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData $ 2.processElement(BigTableSegmentData.j AVA:70)产生的原因:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:失败1动作:StatusRuntimeException:1次,在com.google.cloud.dataflow 。com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf处的.sdk.util.UserCodeException.wrap(UserCodeException.java:35)(com.google.cloud.dataflow.sdk.util处的UserCodeException.java:40) .DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368)at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51)at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement (DoFnRunnerBase.java:138)在com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190)在com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement (ForwardingParDoFn.java:42)com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:4 7)com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process( OutputReceiver.java:52)com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn $ 1.output(SimpleParDoFn.java:160)at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnContext.outputWindowedValue (DoFnRunnerBase.java:287)at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase $ DoFnProcessContext.output(DoFnRunnerBase.java:449)at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData $ 2.processElement(BigTableSegmentData。的java:70)在com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)在com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(Forwar)上的com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) dingParDoFn.java:42)在com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)在com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation。通过com.google.cloud.dataflow.sdk.util.common在com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)上处理(ParDoOperation.java:53)。 worker.ReadOperation.runReadLoop(ReadOperation.java:226),位于com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167),位于com.google.cloud.dataflow.sdk。 util.mon.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71)com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288)com.google.cloud.dataflow。请访问com.google.cloud.dataflow com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173)上的sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221)。 sdk.runners.worker.Dat aflowWorkerHarness $ WorkerThread.doWork(DataflowWorkerHarness.java:193)在com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ WorkerThread.call(DataflowWorkerHarness.java:173)在com.google.cloud.dataflow.sdk。 runner.worker.DataflowWorkerHarness java.util.concurrent.FutureTask.run(FutureTask.java:266)位于java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)处的$ WorkerThread.call(DataflowWorkerHarness.java:160) java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)引起的:org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:失败1动作:StatusRuntimeException:1次,com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389)位于com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274)在com.google.cloud.bigtable.dataflow.CloudBigtableIO $ CloudBigtableSingleTableBuffe redWriteFn.processElement(CloudBigtableIO.java:966)

2016-08-23(13:17:54 )java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang。 RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google .cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow。 sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException :org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache。 hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop。 ...
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08 -23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13: 17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java .lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud .dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud .dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk。 util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org .apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop。 ...
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08 -23(13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13: 17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23 (13:17:54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17: 54)java.lang.RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang .RuntimeException:com.google.cloud.dataflow.sdk.util.UserCodeException:org.apache.hadoop ....
2016-08-23(13:17:54)java.lang.RuntimeException:



在此先感谢

解决方案

这里的问题是,与群集中的Cloud Bigtable节点数量相比,您有太多的Dataflow工作人员。您需要通过减少Dataflow工作人员或联系我们的团队来更改此比例以增加您的Cloud Bigtable资源。

Bigtable相对于您拥有的Cloud Bigtable节点数量表现令人赞叹,但Dataflow的负载过高而无法可靠处理。 / p>

您可以在查看您的使用情况Google云端控制台中的CPU使用率图表。超过80%的容量可能会导致问题。如果获得更多Bigtable Quota,则可以在运行Dataflow作业之前增加节点的数量,并在作业完成后将其减少。例如, Scio做到了这一点



==



关于管道中的数据是否被重新处理?还是在传输到BigTable时丢失?:



Dataflow尝试再次将数据发送到Bigtable。在这些情况下,Dataflow的重试机制将纠正临时性问题。



不幸的是,当问题变为Cloud Bigtable重载时,重试会通过发送更多流量到Bigtable,从而加剧了这个问题。


Executing DataFlow pipelines, every once in a while we see those Exceptions. Is there anything we can do about them? We have a quite simple flow that reads data from a BigQuery query and populate data in BigTable.

Also what happens to data inside the pipeline? Is it reprocessed? Or is it lost in transit to BigTable?

 CloudBigtableIO.initializeForWrite(p);
     p.apply(BigQueryIO.Read.fromQuery(getQuery()))
     .apply(ParDo.of(new DoFn<TableRow, Mutation>() {
           public void processElement(ProcessContext c) {
             Mutation output = convertDataToRow(c.element());
             c.output(output);
           }

           }))
         .apply(CloudBigtableIO.writeToTable(config));


private static Mutation convertDataToRow(TableRow element) {
     LOG.info("element: "+ element);
     LOG.info("BASM_segment_id: "+ element.get("BASM_segment_id"));
     if(element.get("BASM_AID") != null){
         Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
                obj.addColumn(USER_FAMILY, "AID".getBytes(), ((String)element.get("BASM_AID")).getBytes());
         if(element.get("BASM_segment_id") != null){
                obj.addColumn(SEGMENT_FAMILY, "segment_id".getBytes(), ((String)element.get("BASM_segment_id")).getBytes());
         }
         if(element.get("BAS_sub_category") != null){
                obj.addColumn(SEGMENT_FAMILY, "sub_category".getBytes(), ((String)element.get("BAS_sub_category")).getBytes());
         }
         if(element.get("BAS_name") != null){
                obj.addColumn(SEGMENT_FAMILY, "name".getBytes(), ((String)element.get("BAS_name")).getBytes());
         }
         if(element.get("BAS_description") != null){
                obj.addColumn(SEGMENT_FAMILY, "description".getBytes(), ((String)element.get("BAS_description")).getBytes());
         }
         if(element.get("BAS_last_compute_day") != null){obj.addColumn(USER_FAMILY, "Krux_User_id".getBytes(), ((String)element.get("BASM_krux_user_id")).getBytes());
                obj.addColumn(SEGMENT_FAMILY, "last_compute_day".getBytes(), ((String)element.get("BAS_last_compute_day")).getBytes());
         }
         if(element.get("BAS_type") != null){
                obj.addColumn(SEGMENT_FAMILY, "type".getBytes(), ((String)element.get("BAS_type")).getBytes());
         }      
         if(element.get("BASM_REGID") != null){
                obj.addColumn(USER_FAMILY, "REGID".getBytes(), ((String)element.get("BASM_REGID")).getBytes() );
         }
        return obj;
     }else{
         return null;
     }
    }

Following is the exception which we are getting:

2016-08-22T21:47:33.469Z: Error: (84707221e08b977b): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeExc ption: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsExcept on: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn. ava:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn. ava:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Caused by:

org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtabl IO.java:966)

Exception copied from Dataflow console

(7e75740160102c05): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:368) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:287) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:449) at com.nytimes.adtech.dataflow.pipelines.BigTableSegmentData$2.processElement(BigTableSegmentData.java:70) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:167) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:71) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:288) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:173) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:160) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: StatusRuntimeException: 1 time, at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:389) at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:274) at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableBufferedWriteFn.processElement(CloudBigtableIO.java:966)

2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: org.apache.hadoop.... 2016-08-23 (13:17:54) java.lang.RuntimeException:

Thanks in Advance

解决方案

We spoke offline. The problem here is that you have too many Dataflow workers compared to the number of Cloud Bigtable nodes in your cluster. You need to change that ratio by either reducing Dataflow workers or contacting our team to increase your Cloud Bigtable resources.

Bigtable was performing admirably relative to the amount of Cloud Bigtable Nodes you had, but the load from Dataflow was too high to reliably handle.

You can view your usage in the "CPU Usage" graph in the Google Cloud console. Anything over 80% of your capacity is likely to cause problems. If you get more Bigtable Quota, you can increase the number of nodes you have before you run the Dataflow job, and reduce it after the job is done. For example, Scio does that.

==

Regarding "Also what happens to data inside the pipeline? Is it reprocessed? Or is it lost in transit to BigTable?":

Dataflow tries to send the data to Bigtable again. In those cases, Dataflow's retry mechanism will correct for temporary issues.

Unfortunately, when the problem turns out to be Cloud Bigtable overload, the retries compound the problem by sending more traffic to Bigtable, thereby exacerbating the problem.

这篇关于Google云端数据流管道中的例外情况,从BigQuery到Cloud Bigtable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆