使用数据流时获得意外的异常 [英] Get unexpected exception when using dataflow

查看:220
本文介绍了使用数据流时获得意外的异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我通过以下步骤使用数据流时:-从bigquery读取-将表行转换为json字符串-插入elasticsearch(7.5.2)在大约100k条记录下看起来效果很好,但是在插入300k条记录后,实际(8m条记录〜65gb)数据流引发异常.

when im using dataflow with these steps: - Read from bigquery - Convert table row to json string - Insert to elasticsearch (7.5.2) It looks work great with ~100k records, but in real (8m records ~ 65gb) dataflow throw exception after 300k insertion.

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:104) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntries(BatchingShuffleEntryReader.java:125) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.fillEntriesIfNeeded(BatchingShuffleEntryReader.java:119) org.apache.beam.runners.dataflow.worker.util.common.worker.BatchingShuffleEntryReader$ShuffleReadIterator.hasNext(BatchingShuffleEntryReader.java:84) org.apache.beam.runners.dataflow.worker.util.common.ForwardingReiterator.hasNext(ForwardingReiterator.java:63) org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator.advance(GroupingShuffleEntryIterator.java:109) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.advance(GroupingShuffleReader.java:272) org.apache.beam.runners.dataflow.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.start(GroupingShuffleReader.java:266) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:305) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:492) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:196) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2312) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:101) ... 21 more Caused by: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out when customerdataworker-khanhn-02170116-o318-harness-8pqr talking to customerdataworker-khanhn-02170116-o318-harness-f8zt:12346. Server unresponsive (ping error: Deadline Exceeded, {"created":"@1581934551.886578453","description":"Deadline Exceeded","file":"third_party/grpc/src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}). Typically one can self manage this issue, please read: https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method) org.apache.beam.runners.dataflow.worker.ChunkingShuffleBatchReader.read(ChunkingShuffleBatchReader.java:58) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:70) org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader$1.load(CachingShuffleBatchReader.java:66) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ... 27 more

我还将以下配置作为建议使用(增加磁盘大小,工作人数): https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout ,但它似乎仍然无法正常工作.我当前的配置:

I also use these configs as recommendations at (increase disk size, num of workers) : https://cloud.google.com/dataflow/docs/guides/common-errors#tsg-rpc-timeout but it seems still not working. My current configs:

--runner=DataflowRunner \
      --numWorkers=10 \
      --maxNumWorkers=20 \
      --diskSizeGb=150 \
      --workerMachineType=n1-standard-1 \
      --region=asia-east1" && \

-更新1:登录stackdriver 在此处输入图片描述

---Update 1: Log on stackdriverenter image description here

推荐答案

感谢您的答复,我已经解决了这个问题.根本原因是由数据流创建的VM实例具有防火墙规则,我们应该允许它们之间建立连接.

Thanks for the replies, I have solved the problem. The root cause is that VM instances created by dataflow have firewall rules, we should allow connections between those.

这篇关于使用数据流时获得意外的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆