数据流+数据存储= DatastoreException:I/O错误 [英] Dataflow + Datastore = DatastoreException: I/O error

查看:90
本文介绍了数据流+数据存储= DatastoreException:I/O错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用com.google.cloud.datastore从DataFlow写入DataStore.

I'm trying to write to DataStore from DataFlow using com.google.cloud.datastore.

我的代码如下所示(受[1]中的示例启发):

My code looks like this (inspired by the examples in [1]):

public void processElement(ProcessContext c) {
    LocalDatastoreHelper HELPER = LocalDatastoreHelper.create(1.0);
    Datastore datastore = HELPER.options().toBuilder().namespace("ghijklmnop").build().service();
    Key taskKey = datastore.newKeyFactory()
        .ancestors(PathElement.of("TaskList", "default"))
        .kind("Task")
        .newKey("sampleTask");
    Entity task = Entity.builder(taskKey)
        .set("category", "Personal")
        .set("done", false)
        .set("priority", 4)
        .set("description", "Learn Cloud Datastore")
        .build();
    datastore.put(task);
}

我收到此错误:

exception: "java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: com.google.cloud.datastore.DatastoreException: I/O error
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.sideOutputWindowedValue(DoFnRunnerBase.java:314)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.sideOutput(DoFnRunnerBase.java:470)
at com.google.cloud.dataflow.sdk.transforms.Partition$PartitionDoFn.processElement(Partition.java:172)

我尝试使用DatastoreIO接收器,但似乎流媒体运行器当前不支持它.

I have tried to use the DatastoreIO sink, but it looks like it is not currently supported in the streaming runner.

如何避免该错误?或从DataFlow写入DataStore的最佳方法是什么?

How can I avoid that error ? or What's the best way to write from DataFlow to DataStore ?

[1] https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/datastore/src/main/java/com/google/datastore/snippets/Concepts.java

推荐答案

按照@Sam McVeety的建议,我尝试将Datastore代码隔离在Dataflow之外.而且我确实遇到了同样的错误!

Following @Sam McVeety advice, I tried to isolate my Datastore code outside of Dataflow. And I indeed got the same error !

但是,这也让我看到了异常的原因,而在Dataflow日志中却没有看到该异常的原因:

But this also allowed me to see the cause of the exception, which I didn't see in Dataflow logs:

Caused by: java.net.ConnectException: Connection refused

线索是我正在使用的导入行:com.google.cloud.datastore.testing.LocalDatastoreHelper.

The clue is in this import line that I was using: com.google.cloud.datastore.testing.LocalDatastoreHelper.

它是测试的助手,主要用于在本地模拟Datastore API.糟糕!

It's a helper for tests that takes care of basically mocking Datastore API locally. Oops.

这是经过一些本地调试之后我现在得到的代码:

So this is the code that I've got now after some local debugging:

public void processElement(ProcessContext c) {
    final Datastore datastore = DatastoreOptions.defaultInstance().service();
    final KeyFactory keyFactory = datastore.newKeyFactory().kind("Task");

    Key key = datastore.allocateId(keyFactory.newKey());
    Entity task = Entity.builder(key)
        .set("description", StringValue.builder(":D").excludeFromIndexes(true).build())
        .set("created", DateTime.now())
        .set("done", false)
        .build();
    datastore.put(task);
}

主要区别是:

LocalDatastoreHelper.create(1.0).options().toBuilder().namespace("ghijklmnop").build().service()

成为

DatastoreOptions.defaultInstance().service();

这篇关于数据流+数据存储= DatastoreException:I/O错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆