使用DatastoreIO发布Google Dataflow [英] Issue with Google Dataflow using DatastoreIO
问题描述
400错误请求$ b $我试图从Dataflow(DatastoreIO)代码:
Query.Builder q = Query.newBuilder();
q.addKindBuilder()。setName(options.getKind());
Query query = q.build();
(...)
管道p = Pipeline.create(选项);
p.apply(DatastoreIO.readFrom(options.getDataset(),query).named(ReadFromDatastore))...
错误(退出前发生4次):
[INFO] ---- -------------------------------------------------- ------------------
[INFO]构建Google Cloud Dataflow Java示例 - 所有manual_build
[信息] --------- -------------------------------------------------- -------------
[信息]
[信息]>>> exec-maven-plugin:1.1:java(default-cli)@ google-cloud-dataflow-java-examples-all>>>
[信息]
[信息]<<< exec-maven-plugin:1.1:java(default-cli)@ google-cloud-dataflow-java-examples-all<<<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java(default-cli)@ google-cloud-dataflow-java-examples-all ---
2015年3月14日上午8时58分48秒com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO:PipelineOptions.filesToStage未指定。默认的类路径中的文件:将阶段41文件。在DEBUG级别启用日志记录以查看哪些文件将被暂存。
2015年3月14日上午8时58分50秒com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner运行
INFO:在数据流服务上执行管道,这会对Google计算带来计费影响引擎使用情况和其他Google云服务。
2015年3月14日上午8时58分50秒com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO:将41个文件从PipelineOptions.filesToStage上传到GCS以准备在云中执行。
2015年3月14日上午8时59分12秒com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO:上传PipelineOptions.filesToStage完成:1个文件新上传,40个文件缓存
2015年3月14日上午8时59分13秒com.google.cloud.dataflow.sdk.io.DatastoreIO $源queryLatestStatisticsTimestamp
INFO:查询最新统计数据集的时间戳记primebus01花了854ms
Mar 14 ,2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO $ Source getEstimatedSizeBytes
INFO:每种统计信息的查询花费了233ms
Dataflow SDK版本:manual_build
2015年3月14日上午8时59分16秒com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner运行
信息:要访问数据流监视控制台,请导航到https://console.developers.google .com / project / primebus01 / dataflow / job / 2015-03-14_04_59_16-991787963429613914
已提交工作:2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z:(ab0c86e705e7447a) :展开GroupByKey歌剧进入可优化的部分。
2015-03-14T11:59:17.800Z:(ab0c86e705e749b0):用Autotuner信息注释图形。
2015-03-14T11:59:17.806Z:(ab0c86e705e74ee6):融合相邻的ParDo,Read,Write和Flatten操作
2015-03-14T11:59:17.812Z:(ab0c86e705e7441c):Fusing消费者将内容放入ReadFromDatastore
2015-03-14T11:59:17.815Z:(ab0c86e705e74952):将消费者CountWords / Count.PerElement / Init融入CountWords / ExtractWords
2015-03-14T11:59:17.818Z :(ab0c86e705e74e88):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupedValues融入CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /阅读
2015-03-14T11:59:17.820Z :( ab0c86e705e743be):熔合消费者WriteLines到CountWords / FormatCounts
2015-03-14T11:59:17.822Z:(ab0c86e705e748f4):熔合消费者CountWords / FormatCounts到CountWords / Count.PerElement / Sum.PerKey / GroupedValues /提取
2015-03-14T11:59:17.824Z:(ab0c86e705e74e2a):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /写入CountWords / Count.PerElement / Sum.PerKey / GroupedValues / AddIn放置
2015-03-14T11:59:17.826Z:(ab0c86e705e74360):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupedValues / Extract融入CountWords / Count.PerElement / Sum.PerKey / GroupedValues
2015-03-14T11:59:17.828Z:(ab0c86e705e74896):熔合消费者CountWords / Count.PerElement / Sum.PerKey / GroupedValues / AddInput到CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /部分
2015-03-14T11:59:17.830Z:(ab0c86e705e74dcc):将消费者CountWords / ExtractWords融入GetContent
2015-03-14T11:59:17.832Z:(ab0c86e705e74302):将消费者CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /部分进入CountWords / Count.PerElement / Init
2015-03-14T11:59:17.843Z:(ab0c86e705e74d10):将StepResource设置和拆卸添加到工作流程图中。
2015-03-14T11:59:17.850Z:(ab0c86e705e7477c):不添加与租赁相关的步骤。
2015-03-14T11:59:17.864Z:(ac5ca5b613993974):启动输入生成器。
2015-03-14T11:59:17.882Z:(9a0f95eb7a7962f5):添加工作流程开始和停止步骤。
2015-03-14T11:59:17.884Z:(9a0f95eb7a796623):指定舞台ID。
2015-03-14T11:59:18.290Z:(eb8131b6a76f5248):启动工作池设置。
2015-03-14T11:59:18.295Z:(eb8131b6a76f53ac):启动3个工作...
2015-03-14T11:59:18.318Z:S01:(1174d086003eadad):执行操作CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /创建
2015-03-14T11:59:18.345Z:(d91fb5c6a16bad02):值 CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /会话物化。
2015-03-14T11:59:18.367Z:S02:(1174d086003ea94c):执行操作ReadFromDatastore + GetContent + CountWords / ExtractWords + CountWords / Count.PerElement / Init + CountWords / Count.PerElement / Sum.PerKey / GroupByKey /GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum ....
2015-03-14T12:00:19.839Z:(9db26953adb2a181):java .io.IOException:java.io.IOException:com.google.api.services.datastore.client.DatastoreException:组合过滤器必须在com.google.cloud.dataflow.sdk上至少有一个子过滤器
.runners.dataflow.BasicSerializableSourceFormat $ ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
在com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat $ ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
,位于com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
,位于com.google.cloud.dataflow .sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker。 getAndPerformWork(DataflowWorker.java:94)
,位于com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness $ 1.call(DataflowWorkerHarness.java:118)
,位于com.google.cloud.dataflow .sdk.runners.worker.DataflowWorkerHarness $ 1.call(DataflowWorkerHarness.java:115)
位于java.util.concurrent.FutureTask.run(FutureTask.java:266)
位于java.util.concurrent。 Executors $ RunnableAdapter.call(Executors.java:511)$ b $ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor。 java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Wo rker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
导致:java.io.IOException:com.google.api.services。 datastore.client.DatastoreException:组合过滤器必须至少有一个子过滤器
,位于com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.advance(DatastoreIO.java:599)
com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.start(DatastoreIO.java:590)
,位于com.google.cloud.dataflow.sdk.io.Source $ WindowedReaderWrapper.start(Source。 java:178)
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat $ ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
... 14 more
由:com.google.api.services.datastore.client.DatastoreException:组合过滤器必须至少在com.google.api.services.datastore.client.RemoteRpc.makeException上有至少一个子过滤器
(RemoteRpc.java :115)
,位于com.google.api.services.datastore.client .RemoteRpc.call(RemoteRpc.java:81)
,位于com.google.api.services.datastore.client.BaseDatastoreFactory $ RemoteRpc.call(BaseDatastoreFactory.java:41)
,位于com.google.api .services.datastore.client.Datastore.runQuery(Datastore.java:109)
,位于com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
在com.google.cloud.dataflow.sdk.io.DatastoreIO $ DatastoreReader.advance(DatastoreIO.java:597)
... 17多
引起:com.google.api.client.http .HttpResponseException:400错误请求
a复合过滤器必须至少有一个子过滤器
,位于com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
... 21 more
2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940):失败的任务将被重试。
................................
2015-03-14T12:01:02.703Z:(1174d086003eaff0):执行失败步骤失败1
2015-03-14T12:01:02.707Z:(1174d086003ea342):工作流失败。原因:(ac5ca5b613993837):Map任务完成的步骤ReadFromDatastore + +的getContent CountWords / ExtractWords + CountWords / Count.PerElement /初始化+ CountWords / Count.PerElement / Sum.PerKey / GroupByKey / GroupByKeyOnly /部分+ CountWords / Count.PerElement / Sum.PerKey / GroupedValues / AddInput + CountWords / Count.PerElement / Sum ....失败。原因:(9013e6edb4cda414):任务已尝试失败4次,最大允许。
2015-03-14T12:01:02.742Z:(157f9c08481a25c5):停止输入生成器。
2015-03-14T12:01:02.756Z:(697e8bf226b989af):清理。
2015-03-14T12:01:02.765Z:(697e8bf226b98f98):撕下未决资源...
2015-03-14T12:01:02.771Z:(697e8bf226b98581):启动员工池拆解。
2015-03-14T12:01:02.776Z:(697e8bf226b9880d):停止员工池...
这带来了其他问题:
1 - 在这种情况下,我应该定义一个过滤器吗?
2 - Dataflow用于分割作业的标准是什么?
3 - 是否有更简单的方法从Datastore转储大表?
谢谢。
我相信我已经隔离了您遇到的问题:
- DatastoreIO查询分裂的方式存在一个错误(我们使用 QuerySplitter ;在某些情况下 - 特别是查询返回的结果很少 - 它可能会产生对平行分片的无效查询,包含空的复合过滤器)。我已经通知Datastore团队关于这个错误,他们正在处理它。
- 根据您的代码片段,您的查询只是从数据集中读取某种特定实体。它如何触发上述问题的唯一方法是如果此查询返回零结果。如果你使用命名空间,这是可能的 - 不幸的是,QuerySplitter现在也不支持命名空间。该团队正在考虑消除这一限制,但没有公开的时间表。
- Dataflow SDK错误消息应该更清楚发生了什么,并且SDK应该尽早检测到这些错误。我将解决这个问题。
现在,如果您使用的是命名空间,那么看起来您运气不佳并行化数据存储的读取过程:(如果大部分处理都是在此之后发生的(即,如果您为每个数据存储实体执行了不重要的工作量),那么您当前最好的选择是编写一个(基本上不存在的)并行)管道将实体转储到GCS,第二个管道从中读取实体并并行处理它们。
第一条管道看起来像这样:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(DatastoreIO.readFrom(dataset,query) .named(ReadFromDatastore))
.apply(ParDo.of(new ConvertEntitiesFn()))。setCoder(AvroCoder.of(MyEntity.class))
.apply(AvroIO.Write.named WriteEntitiesToGCS)
.to(gs://your-bucket/path/to/temp.avro)
.withS Chema(MyEntity.class)
.withNumShards(1))
其中MyEntity是您的课程来表示您的实体类型,并且ConvertEntitiesFn是执行转换的 确保使用DirectPipelineRunner运行此管道(与 ,第二个是这样的: 我意识到这不是一个很好的解决方法。请继续关注数据存储库的更新。 I'm trying to read a Datastore table with 300k records from Dataflow (DatastoreIO) and getting the following error from Datastore API. 400 Bad Request
a composite filter must have at least one sub-filter Code: Error (happens 4 times before exit): This brings other questions: 1 - In this case, am I supposed to define a filter? 2 - What is the criteria that Dataflow uses to split the jobs? 3 - Is there an easier way to dump a big table from Datastore? Thanks. I believe I have isolated the problems you're having: For now, if you are using namespaces, then it seems that you're out of luck in terms of parallelizing the process of reading from Datastore :( If the bulk of your processing happens after that (i.e. if you do a non-trivial amount of work per Datastore entity), then your current best bet would be to write a (essentially non-parallel) pipeline to dump the entities to GCS, and a second pipeline to read the entities from there and process them in parallel. The first pipeline would look something like this: where MyEntity is your class to represent your kind of entities, and ConvertEntitiesFn is a Make sure to run this pipeline using DirectPipelineRunner (similarly to how and the second something like this: I realize that this is not a great work-around. please stay tuned for updates to the Datastore library. 这篇关于使用DatastoreIO发布Google Dataflow的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋! DoFn
writeDataToDatastore
在 DatastoreWordCount.java示例)。这将绕过查询拆分阶段。
管道管道= Pipeline.create(选项);
pipeline.apply(AvroIO.Read.named(ReadEntitiesFromGCS)
.from(gs://your-bucket/path/to/temp.avro)
.withSchema( MyEntity.class))
.apply(其余部分)
Query.Builder q = Query.newBuilder();
q.addKindBuilder().setName(options.getKind());
Query query = q.build();
(...)
Pipeline p = Pipeline.create(options);
p.apply(DatastoreIO.readFrom(options.getDataset(), query).named("ReadFromDatastore"))...
[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all <<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
Mar 14, 2015 8:58:48 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 41 files. Enable logging at DEBUG level to see which files will be staged.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 14, 2015 8:58:50 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 41 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 14, 2015 8:59:12 AM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 40 files cached
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source queryLatestStatisticsTimestamp
INFO: Query for latest stats timestamp of dataset primebus01 took 854ms
Mar 14, 2015 8:59:13 AM com.google.cloud.dataflow.sdk.io.DatastoreIO$Source getEstimatedSizeBytes
INFO: Query for per-kind statistics took 233ms
Dataflow SDK version: manual_build
Mar 14, 2015 8:59:16 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/primebus01/dataflow/job/2015-03-14_04_59_16-991787963429613914
Submitted job: 2015-03-14_04_59_16-991787963429613914
2015-03-14T11:59:17.796Z: (ab0c86e705e7447a): Expanding GroupByKey operations into optimizable parts.
2015-03-14T11:59:17.800Z: (ab0c86e705e749b0): Annotating graph with Autotuner information.
2015-03-14T11:59:17.806Z: (ab0c86e705e74ee6): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-14T11:59:17.812Z: (ab0c86e705e7441c): Fusing consumer GetContent into ReadFromDatastore
2015-03-14T11:59:17.815Z: (ab0c86e705e74952): Fusing consumer CountWords/Count.PerElement/Init into CountWords/ExtractWords
2015-03-14T11:59:17.818Z: (ab0c86e705e74e88): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Read
2015-03-14T11:59:17.820Z: (ab0c86e705e743be): Fusing consumer WriteLines into CountWords/FormatCounts
2015-03-14T11:59:17.822Z: (ab0c86e705e748f4): Fusing consumer CountWords/FormatCounts into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract
2015-03-14T11:59:17.824Z: (ab0c86e705e74e2a): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Write into CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput
2015-03-14T11:59:17.826Z: (ab0c86e705e74360): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/Extract into CountWords/Count.PerElement/Sum.PerKey/GroupedValues
2015-03-14T11:59:17.828Z: (ab0c86e705e74896): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput into CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial
2015-03-14T11:59:17.830Z: (ab0c86e705e74dcc): Fusing consumer CountWords/ExtractWords into GetContent
2015-03-14T11:59:17.832Z: (ab0c86e705e74302): Fusing consumer CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial into CountWords/Count.PerElement/Init
2015-03-14T11:59:17.843Z: (ab0c86e705e74d10): Adding StepResource setup and teardown to workflow graph.
2015-03-14T11:59:17.850Z: (ab0c86e705e7477c): Not adding lease related steps.
2015-03-14T11:59:17.864Z: (ac5ca5b613993974): Starting the input generators.
2015-03-14T11:59:17.882Z: (9a0f95eb7a7962f5): Adding workflow start and stop steps.
2015-03-14T11:59:17.884Z: (9a0f95eb7a796623): Assigning stage ids.
2015-03-14T11:59:18.290Z: (eb8131b6a76f5248): Starting worker pool setup.
2015-03-14T11:59:18.295Z: (eb8131b6a76f53ac): Starting 3 workers...
2015-03-14T11:59:18.318Z: S01: (1174d086003eadad): Executing operation CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Create
2015-03-14T11:59:18.345Z: (d91fb5c6a16bad02): Value "CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Session" materialized.
2015-03-14T11:59:18.367Z: S02: (1174d086003ea94c): Executing operation ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum....
2015-03-14T12:00:19.839Z: (9db26953adb2a181): java.io.IOException: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:266)
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.hasNext(BasicSerializableSourceFormat.java:239)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:173)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:120)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:129)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:94)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:118)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$1.call(DataflowWorkerHarness.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:599)
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.start(DatastoreIO.java:590)
at com.google.cloud.dataflow.sdk.io.Source$WindowedReaderWrapper.start(Source.java:178)
at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$ReaderIterator.advanceInternal(BasicSerializableSourceFormat.java:257)
... 14 more
Caused by: com.google.api.services.datastore.client.DatastoreException: a composite filter must have at least one sub-filter
at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115)
at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81)
at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41)
at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109)
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.getIteratorAndMoveCursor(DatastoreIO.java:630)
at com.google.cloud.dataflow.sdk.io.DatastoreIO$DatastoreReader.advance(DatastoreIO.java:597)
... 17 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
a composite filter must have at least one sub-filter
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1054)
at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:78)
... 21 more
2015-03-14T12:00:24.850Z: (7ffa05cd66cd6940): Failed task is going to be retried.
................................
2015-03-14T12:01:02.703Z: (1174d086003eaff0): Executing failure step failure1
2015-03-14T12:01:02.707Z: (1174d086003ea342): Workflow failed. Causes: (ac5ca5b613993837): Map task completion for Step "ReadFromDatastore+GetContent+CountWords/ExtractWords+CountWords/Count.PerElement/Init+CountWords/Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly/Partial+CountWords/Count.PerElement/Sum.PerKey/GroupedValues/AddInput+CountWords/Count.PerElement/Sum...." failed. Causes: (9013e6edb4cda414): Task has been attempted unsuccessfully 4 times, the maximum allowed.
2015-03-14T12:01:02.742Z: (157f9c08481a25c5): Stopping the input generators.
2015-03-14T12:01:02.756Z: (697e8bf226b989af): Cleaning up.
2015-03-14T12:01:02.765Z: (697e8bf226b98f98): Tearing down pending resources...
2015-03-14T12:01:02.771Z: (697e8bf226b98581): Starting worker pool teardown.
2015-03-14T12:01:02.776Z: (697e8bf226b9880d): Stopping worker pool...
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(DatastoreIO.readFrom(dataset, query).named("ReadFromDatastore"))
.apply(ParDo.of(new ConvertEntitiesFn())).setCoder(AvroCoder.of(MyEntity.class))
.apply(AvroIO.Write.named("WriteEntitiesToGCS")
.to("gs://your-bucket/path/to/temp.avro")
.withSchema(MyEntity.class)
.withNumShards(1))
DoFn<DatastoreV1.Entity, MyEntity>
doing the conversion.writeDataToDatastore
does it in DatastoreWordCount.java example). This will bypass the query splitting stage.Pipeline pipeline = Pipeline.create(options);
pipeline.apply(AvroIO.Read.named("ReadEntitiesFromGCS")
.from("gs://your-bucket/path/to/temp.avro")
.withSchema(MyEntity.class))
.apply(the rest of your pipeline)