使用DatastoreIO发布Google Dataflow [英] Issue with Google Dataflow using DatastoreIO

查看:108
本文介绍了使用DatastoreIO发布Google Dataflow的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



400错误请求$ b $我试图从Dataflow(DatastoreIO)代码:

  Query.Builder q = Query.newBuilder(); 
q.addKindBuilder()。setName(options.getKind());
Query query = q.build();

(...)

管道p = Pipeline.create(选项);
p.apply(DatastoreIO.readFrom(options.getDataset(),query).named(ReadFromDatastore))...

错误(退出前发生4次):

  [INFO] ---- -------------------------------------------------- ------------------ 
[INFO]构建Google Cloud Dataflow Java示例 - 所有manual_build
[信息] --------- -------------------------------------------------- -------------
[信息]
[信息]>>> exec-maven-plugin:1.1:java(default-cli)@ google-cloud-dataflow-java-examples-all>>>
[信息]
[信息]<<< exec-maven-plugin:1.1:java(default-cli)@ google-cloud-dataflow-java-examples-all<<<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java(default-cli)@ google-cloud-dataflow-java-examples-all ---
2015年3月14日上午8时58分48秒com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO:PipelineOptions.filesToStage未指定。默认的类路径中的文件:将阶段41文件。在DEBUG级别启用日志记录以查看哪些文件将被暂存。
2015年3月14日上午8时58分50秒com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner运行
INFO:在数据流服务上执行管道,这会对Google计算带来计费影响引擎使用情况和其他Google云服务。
2015年3月14日上午8时58分50秒com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO:将41个文件从PipelineOptions.filesToStage上传到GCS以准备在云中执行。
2015年3月14日上午8时59分12秒com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO:上传PipelineOptions.filesToStage完成:1个文件新上传,40个文件缓存
2015年3月14日上午8时59分13秒com.google.cloud.dataflow.sdk.io.DatastoreIO $源queryLatestStatisticsTimestamp
INFO:查询最新统计数据集的时间戳记primebus01花了854ms
Mar 14 ,2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO $ Source getEstimatedSizeBytes
INFO:每种统计信息的查询花费了233ms
Dataflow SDK版本:manual_build
2015年3月14日上午8时59分16秒com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner运行
信息:要访问数据流监视控制台,请导航到https://console.developers.google .com / project / primebus01 / dataflow / job / 2015-03-14_04_59_16-991787963429613914
已提交工作:2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z:(ab0c86e705e7447a) :展开GroupByKey歌剧进入可优化的部分。
2015-03-14T11:59:17.800Z:(ab0c86e705e749b0):用Autotuner信息注释图形。
2015-03-14T11:59:17.806Z:(ab0c86e705e74ee6):融合相邻的ParDo,Read,Write和Flatten操作
2015-03-14T11:59:17.812Z:(ab0c86e705e7441c):Fusing消费者将内容放入ReadFromDatastore
2015-03-14T11:59:17.815Z:(ab0c86e705e74952):将消费者CountWords / Count.PerElement / Init融入CountWords / ExtractWords
2015-03-14T11:59:17.818Z :(ab0c86e705e74e88):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupedValues融入CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /阅读
2015-03-14T11:59:17.820Z :( ab0c86e705e743be):熔合消费者WriteLines到CountWords / FormatCounts
2015-03-14T11:59:17.822Z:(ab0c86e705e748f4):熔合消费者CountWords / FormatCounts到CountWords / Count.PerElement / Sum.PerKey / GroupedValues /提取
2015-03-14T11:59:17.824Z:(ab0c86e705e74e2a):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /写入CountWords / Count.PerElement / Sum.PerKey / GroupedValues / AddIn放置
2015-03-14T11:59:17.826Z:(ab0c86e705e74360):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupedValues / Extract融入CountWords / Count.PerElement / Sum.PerKey / GroupedValues
2015-03-14T11:59:17.828Z:(ab0c86e705e74896):熔合消费者CountWords / Count.PerElement / Sum.PerKey / GroupedValues / AddInput到CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /部分
2015-03-14T11:59:17.830Z:(ab0c86e705e74dcc):将消费者CountWords / ExtractWords融入GetContent
2015-03-14T11:59:17.832Z:(ab0c86e705e74302):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /部分进入CountWords / Count.PerElement / Init
2015-03-14T11:59:17.843Z:(ab0c86e705e74d10):将StepResource设置和拆卸添加到工作流程图中。
2015-03-14T11:59:17.850Z:(ab0c86e705e7477c):不添加与租赁相关的步骤。
2015-03-14T11:59:17.864Z:(ac5ca5b613993974):启动输入生成器。
2015-03-14T11:59:17.882Z:(9a0f95eb7a7962f5):添加工作流程开始和停止步骤。
2015-03-14T11:59:17.884Z:(9a0f95eb7a796623):指定舞台ID。
2015-03-14T11:59:18.290Z:(eb8131b6a76f5248):启动工作池设置。
2015-03-14T11:59:18.295Z:(eb8131b6a76f53ac):启动3个工作...
2015-03-14T11:59:18.318Z:S01:(1174d086003eadad):执行操作CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /创建
2015-03-14T11:59:18.345Z:(d91fb5c6a16bad02):值 CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /会话物化。
2015-03-14T11:59:18.367Z:S02:(1174d086003ea94c):执行操作ReadFromDatastore + GetContent + CountWords / ExtractWords + CountWords / Count.PerElement / Init + CountWords / Count.PerElement / Sum.PerKey / GroupByKey /GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum ....
2015-03-14T12:00:19.839Z:(9db26953adb2a181):java .io.IOException:java.io.IOException:com.google.api.services.datastore.client.DatastoreException:组合过滤器必须在com.google.cloud.dataflow.sdk上至少有一个子过滤器
.runners.dataflow.BasicSerializableSourceFormat $ ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
在com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat $ ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
,位于com.google.cloud.dataflow .sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker。 getAndPerformWork(DataflowWorker.java:94)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ 1.call(DataflowWorkerHarness.java:118)
,位于com.google.cloud.dataflow .sdk.runners.worker.DataflowWorkerHarness $ 1.call(DataflowWorkerHarness.java:115)
位于java.util.concurrent.FutureTask.run(FutureTask.java:266)
位于java.util.concurrent。 Executors $ RunnableAdapter.call(Executors.java:511)$ b $ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor。 java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Wo rker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
导致:java.io.IOException:com.google.api.services。 datastore.client.DatastoreException:组合过滤器必须至少有一个子过滤器
,位于com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.advance(DatastoreIO.java:599)
com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.start(DatastoreIO.java:590)
,位于com.google.cloud.dataflow.sdk.io.Source $ WindowedReaderWrapper.start(Source。 java:178)
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat $ ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
... 14 more
由:com.google.api.services.datastore.client.DatastoreException:组合过滤器必须至少在com.google.api.services.datastore.client.RemoteRpc.makeException上有至少一个子过滤器
(RemoteRpc.java :115)
,位于com.google.api.services.datastore.client .RemoteRpc.call(RemoteRpc.java:81)
,位于com.google.api.services.datastore.client.BaseDatastoreFactory $ RemoteRpc.call(BaseDatastoreFactory.java:41)
,位于com.google.api .services.datastore.client.Datastore.runQuery(Datastore.java:109)
,位于com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
在com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.advance(DatastoreIO.java:597)
... 17多
引起:com.google.api.client.http .HttpResponseException:400错误请求
a复合过滤器必须至少有一个子过滤器
,位于com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
... 21 more

2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940):失败的任务将被重试。


................................


2015-03-14T12:01:02.703Z:(1174d086003eaff0):执行失败步骤失败1
2015-03-14T12:01:02.707Z:(1174d086003ea342):工作流失败。原因:(ac5ca5b613993837):Map任务完成的步骤ReadFromDatastore + +的getContent CountWords / ExtractWords + CountWords / Count.PerElement /初始化+ CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /部分+ CountWords / Count.PerElement / Sum.PerKey / GroupedValues / AddInput + CountWords / Count.PerElement / Sum ....失败。原因:(9013e6edb4cda414):任务已尝试失败4次,最大允许。
2015-03-14T12:01:02.742Z:(157f9c08481a25c5):停止输入生成器。
2015-03-14T12:01:02.756Z:(697e8bf226b989af):清理。
2015-03-14T12:01:02.765Z:(697e8bf226b98f98):撕下未决资源...
2015-03-14T12:01:02.771Z:(697e8bf226b98581):启动员工池拆解。
2015-03-14T12:01:02.776Z:(697e8bf226b9880d):停止员工池...

这带来了其他问题:

1 - 在这种情况下,我应该定义一个过滤器吗?



2 - Dataflow用于分割作业的标准是什么?



3 - 是否有更简单的方法从Datastore转储大表?



谢谢。

解决方案

我相信我已经隔离了您遇到的问题:


  • DatastoreIO查询分裂的方式存在一个错误(我们使用 QuerySplitter ;在某些情况下 - 特别是查询返回的结果很少 - 它可能会产生对平行分片的无效查询,包含空的复合过滤器)。我已经通知Datastore团队关于这个错误,他们正在处理它。

  • 根据您的代码片段,您的查询只是从数据集中读取某种特定实体。它如何触发上述问题的唯一方法是如果此查询返回零结果。如果你使用命名空间,这是可能的 - 不幸的是,QuerySplitter现在也不支持命名空间。该团队正在考虑消除这一限制,但没有公开的时间表。
  • Dataflow SDK错误消息应该更清楚发生了什么,并且SDK应该尽早检测到这些错误。我将解决这个问题。



现在,如果您使用的是命名空间,那么看起来您运气不佳并行化数据存储的读取过程:(如果大部分处理都是在此之后发生的(即,如果您为每个数据存储实体执行了不重要的工作量),那么您当前最好的选择是编写一个(基本上不存在的)并行)管道将实体转储到GCS,第二个管道从中读取实体并并行处理它们。

第一条管道看起来像这样:

  Pipeline pipeline = Pipeline.create(options); 
pipeline.apply(DatastoreIO.readFrom(dataset,query) .named(ReadFromDatastore))
.apply(ParDo.of(new ConvertEntitiesFn()))。setCoder(AvroCoder.of(MyEntity.class))
.apply(AvroIO.Write.named WriteEntitiesToGCS)
.to(gs://your-bucket/path/to/temp.avro)
.withS Chema(MyEntity.class)
.withNumShards(1))

其中MyEntity是您的课程来表示您的实体类型,并且ConvertEntitiesFn是执行转换的 DoFn

确保使用DirectPipelineRunner运行此管道(与 writeDataToDatastore DatastoreWordCount.java示例)。这将绕过查询拆分阶段。



,第二个是这样的:

 管道管道= Pipeline.create(选项); 
pipeline.apply(AvroIO.Read.named(ReadEntitiesFromGCS)
.from(gs://your-bucket/path/to/temp.avro)
.withSchema( MyEntity.class))
.apply(其余部分)

我意识到这不是一个很好的解决方法。请继续关注数据存储库的更新。


I'm trying to read a Datastore table with 300k records from Dataflow (DatastoreIO) and getting the following error from Datastore API.

400 Bad Request a composite filter must have at least one sub-filter

Code:

Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(options.getKind());
Query query = q.build();

(...)

Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.readFrom(options.getDataset(), query).named("ReadFromDatastore"))...

Error (happens 4 times before exit):

[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all >>>
[INFO] 
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all <<<
[INFO] 
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
Mar 14, 2015 8:58:48 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 41 files. Enable logging at DEBUG level to see which files will be staged.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 41 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 14, 2015 8:59:12 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 40 files cached
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source queryLatestStatisticsTimestamp
INFO: Query for latest stats timestamp of dataset primebus01 took 854ms
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source getEstimatedSizeBytes
INFO: Query for per-kind statistics took 233ms
Dataflow SDK version: manual_build
Mar 14, 2015 8:59:16 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/primebus01/dataflow/job/2015-03-14_04_59_16-991787963429613914
Submitted job: 2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z: (ab0c86e705e7447a): Expanding GroupByKey operations into optimizable parts.
2015-03-14T11:59:17.800Z: (ab0c86e705e749b0): Annotating graph with Autotuner information.
2015-03-14T11:59:17.806Z: (ab0c86e705e74ee6): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-14T11:59:17.812Z: (ab0c86e705e7441c): Fusing consumer GetContent into ReadFromDatastore
2015-03-14T11:59:17.815Z: (ab0c86e705e74952): Fusing consumer CountWords/Count.PerElement/Init into CountWords/ExtractWords
2015-03-14T11:59:17.818Z: (ab0c86e705e74e88): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Read
2015-03-14T11:59:17.820Z: (ab0c86e705e743be): Fusing consumer WriteLines into CountWords/FormatCounts
2015-03-14T11:59:17.822Z: (ab0c86e705e748f4): Fusing consumer CountWords/FormatCounts into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract
2015-03-14T11:59:17.824Z: (ab0c86e705e74e2a): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Write into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput
2015-03-14T11:59:17.826Z: (ab0c86e705e74360): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract into CountWords/Count.PerElement/Sum.PerKey/GroupedValues
2015-03-14T11:59:17.828Z: (ab0c86e705e74896): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial
2015-03-14T11:59:17.830Z: (ab0c86e705e74dcc): Fusing consumer CountWords/ExtractWords into GetContent
2015-03-14T11:59:17.832Z: (ab0c86e705e74302): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial into CountWords/Count.PerElement/Init
2015-03-14T11:59:17.843Z: (ab0c86e705e74d10): Adding StepResource setup and teardown to workflow graph.
2015-03-14T11:59:17.850Z: (ab0c86e705e7477c): Not adding lease related steps.
2015-03-14T11:59:17.864Z: (ac5ca5b613993974): Starting the input generators.
2015-03-14T11:59:17.882Z: (9a0f95eb7a7962f5): Adding workflow start and stop steps.
2015-03-14T11:59:17.884Z: (9a0f95eb7a796623): Assigning stage ids.
2015-03-14T11:59:18.290Z: (eb8131b6a76f5248): Starting worker pool setup.
2015-03-14T11:59:18.295Z: (eb8131b6a76f53ac): Starting 3 workers...
2015-03-14T11:59:18.318Z: S01: (1174d086003eadad): Executing operation CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Create
2015-03-14T11:59:18.345Z: (d91fb5c6a16bad02): Value "CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Session" materialized.
2015-03-14T11:59:18.367Z: S02: (1174d086003ea94c): Executing operation ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum....
2015-03-14T12:00:19.839Z: (9db26953adb2a181): java.io.IOException: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:94)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:118)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:599)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:590)
    at com.google.cloud.dataflow.sdk.io.Source$WindowedReaderWrapper.start(Source.java:178)
    at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
    ... 14 more
Caused by: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
    at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115)
    at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81)
    at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41)
    at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
    at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:597)
    ... 17 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
a composite filter must have at least one sub-filter
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
    at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
    ... 21 more

2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940): Failed task is going to be retried.


................................


2015-03-14T12:01:02.703Z: (1174d086003eaff0): Executing failure step failure1
2015-03-14T12:01:02.707Z: (1174d086003ea342): Workflow failed. Causes: (ac5ca5b613993837): Map task completion for Step "ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum...." failed. Causes: (9013e6edb4cda414): Task has been attempted unsuccessfully 4 times, the maximum allowed.
2015-03-14T12:01:02.742Z: (157f9c08481a25c5): Stopping the input generators.
2015-03-14T12:01:02.756Z: (697e8bf226b989af): Cleaning up.
2015-03-14T12:01:02.765Z: (697e8bf226b98f98): Tearing down pending resources...
2015-03-14T12:01:02.771Z: (697e8bf226b98581): Starting worker pool teardown.
2015-03-14T12:01:02.776Z: (697e8bf226b9880d): Stopping worker pool...

This brings other questions:

1 - In this case, am I supposed to define a filter?

2 - What is the criteria that Dataflow uses to split the jobs?

3 - Is there an easier way to dump a big table from Datastore?

Thanks.

解决方案

I believe I have isolated the problems you're having:

  • There is a bug in the way DatastoreIO queries are split (we use the QuerySplitter; under some conditions - in particular, when the query returns very few results - it can produce invalid queries for the parallel shards, containing an empty composite filter). I have notified the Datastore team about this bug and they are working on it.
  • According to your code snippet, your query is simply reading all entities of a certain kind from your dataset. The only way how it could trigger the problem above is if this query returned zero results. This is possible in case you're using namespaces - unfortunately QuerySplitter does not support namespaces right now either. The team is considering removing this limitation, but there is no public timeframe.
  • The Dataflow SDK error messages should be more clear about what's going on, and the SDK should detect these errors earlier. I'll fix this.

For now, if you are using namespaces, then it seems that you're out of luck in terms of parallelizing the process of reading from Datastore :( If the bulk of your processing happens after that (i.e. if you do a non-trivial amount of work per Datastore entity), then your current best bet would be to write a (essentially non-parallel) pipeline to dump the entities to GCS, and a second pipeline to read the entities from there and process them in parallel.

The first pipeline would look something like this:

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(DatastoreIO.readFrom(dataset, query).named("ReadFromDatastore"))
        .apply(ParDo.of(new ConvertEntitiesFn())).setCoder(AvroCoder.of(MyEntity.class))
        .apply(AvroIO.Write.named("WriteEntitiesToGCS")
                           .to("gs://your-bucket/path/to/temp.avro")
                           .withSchema(MyEntity.class)
                           .withNumShards(1))

where MyEntity is your class to represent your kind of entities, and ConvertEntitiesFn is a DoFn<DatastoreV1.Entity, MyEntity> doing the conversion.

Make sure to run this pipeline using DirectPipelineRunner (similarly to how writeDataToDatastore does it in DatastoreWordCount.java example). This will bypass the query splitting stage.

and the second something like this:

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(AvroIO.Read.named("ReadEntitiesFromGCS")
                          .from("gs://your-bucket/path/to/temp.avro")
                          .withSchema(MyEntity.class))
        .apply(the rest of your pipeline)

I realize that this is not a great work-around. please stay tuned for updates to the Datastore library.

这篇关于使用DatastoreIO发布Google Dataflow的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆